/*
 * Decompiled with CFR 0.152.
 */
package com.dremio.jdbc.shaded.com.dremio.service.coordinator.zk;

import com.dremio.jdbc.shaded.com.dremio.common.AutoCloseables;
import com.dremio.jdbc.shaded.com.dremio.common.concurrent.CloseableSchedulerThreadPool;
import com.dremio.jdbc.shaded.com.dremio.common.concurrent.NamedThreadFactory;
import com.dremio.jdbc.shaded.com.dremio.exec.proto.CoordinationProtos;
import com.dremio.jdbc.shaded.com.dremio.options.TypeValidators;
import com.dremio.jdbc.shaded.com.dremio.service.Service;
import com.dremio.jdbc.shaded.com.dremio.service.coordinator.CoordinatorLostHandle;
import com.dremio.jdbc.shaded.com.dremio.service.coordinator.DistributedSemaphore;
import com.dremio.jdbc.shaded.com.dremio.service.coordinator.ElectionListener;
import com.dremio.jdbc.shaded.com.dremio.service.coordinator.ElectionRegistrationHandle;
import com.dremio.jdbc.shaded.com.dremio.service.coordinator.LinearizableHierarchicalStore;
import com.dremio.jdbc.shaded.com.dremio.service.coordinator.ObservableConnectionLostHandler;
import com.dremio.jdbc.shaded.com.dremio.service.coordinator.zk.BoundedExponentialDelay;
import com.dremio.jdbc.shaded.com.dremio.service.coordinator.zk.ServiceInstanceHelper;
import com.dremio.jdbc.shaded.com.dremio.service.coordinator.zk.ZKClusterConfig;
import com.dremio.jdbc.shaded.com.dremio.service.coordinator.zk.ZKElectionListener;
import com.dremio.jdbc.shaded.com.dremio.service.coordinator.zk.ZKLinearizableStore;
import com.dremio.jdbc.shaded.com.dremio.service.coordinator.zk.ZKServiceSet;
import com.dremio.jdbc.shaded.com.dremio.service.coordinator.zk.ZkDistributedSemaphore;
import com.dremio.jdbc.shaded.com.dremio.telemetry.api.metrics.SimpleCounter;
import com.dremio.jdbc.shaded.com.google.common.annotations.VisibleForTesting;
import com.dremio.jdbc.shaded.com.google.common.base.Preconditions;
import com.dremio.jdbc.shaded.com.google.common.base.Throwables;
import com.dremio.jdbc.shaded.com.google.common.util.concurrent.FutureCallback;
import com.dremio.jdbc.shaded.com.google.common.util.concurrent.Futures;
import com.dremio.jdbc.shaded.com.google.common.util.concurrent.ListenableFuture;
import com.dremio.jdbc.shaded.com.google.common.util.concurrent.ListeningExecutorService;
import com.dremio.jdbc.shaded.com.google.common.util.concurrent.MoreExecutors;
import com.dremio.jdbc.shaded.org.apache.curator.framework.CuratorFramework;
import com.dremio.jdbc.shaded.org.apache.curator.framework.CuratorFrameworkFactory;
import com.dremio.jdbc.shaded.org.apache.curator.framework.api.ChildrenDeletable;
import com.dremio.jdbc.shaded.org.apache.curator.framework.recipes.leader.LeaderLatch;
import com.dremio.jdbc.shaded.org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import com.dremio.jdbc.shaded.org.apache.curator.framework.recipes.leader.Participant;
import com.dremio.jdbc.shaded.org.apache.curator.framework.state.ConnectionState;
import com.dremio.jdbc.shaded.org.apache.curator.framework.state.ConnectionStateListener;
import com.dremio.jdbc.shaded.org.apache.curator.utils.ZookeeperCompatibility;
import com.dremio.jdbc.shaded.org.apache.curator.utils.ZookeeperFactory;
import com.dremio.jdbc.shaded.org.apache.curator.x.discovery.ServiceDiscovery;
import com.dremio.jdbc.shaded.org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import com.dremio.jdbc.shaded.org.apache.zookeeper.Watcher;
import com.dremio.jdbc.shaded.org.apache.zookeeper.ZooKeeper;
import com.dremio.jdbc.shaded.org.slf4j.Logger;
import com.dremio.jdbc.shaded.org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.concurrent.ThreadSafe;
import javax.inject.Provider;

public class ZKClusterClient
implements Service {
    private static final Logger logger = LoggerFactory.getLogger(ZKClusterClient.class);
    private static final ZookeeperCompatibility ZK_35_COMPATIBILITY = ZookeeperCompatibility.builder().hasPersistentWatchers(false).build();
    private static final Pattern ZK_COMPLEX_STRING = Pattern.compile("(^[^/]*?)/(?:(.*)/)?([^/]*)$");
    public static final String ZK_LOST_HANDLER_MODULE_CLASS = "dremio.coordinator_lost_handle.module.class";
    private static final SimpleCounter ZK_SUPERVISOR_FAILED_COUNTER = SimpleCounter.of("zk_cluster_client.supervisor_probe_failed", "Number of failed attempts to probe the ZK supervisor");
    private static final SimpleCounter ZK_SUPERVISOR_EXIT_APP_COUNTER = SimpleCounter.of("zk_cluster_client.supervisor_exit_application", "Number of times the application was terminated because of the ZK supervisor probe failures");
    private static final SimpleCounter ZK_SESSION_LOST_COUNTER = SimpleCounter.of("zk_cluster_client.session_lost", "Number of times connection to ZK was lost");
    private static final SimpleCounter ZK_SESSION_SUSPENDED_COUNTER = SimpleCounter.of("zk_cluster_client.session_suspended", "Number of times connection to ZK was suspended");
    private static final SimpleCounter ZK_RECONNECTED_COUNTER = SimpleCounter.of("zk_cluster_client.session_recovered", "Number of times connection to ZK was successfully reestablished");
    private final String clusterId;
    private final CountDownLatch initialConnection = new CountDownLatch(1);
    private final ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(new NamedThreadFactory("zk-curator-")));
    private CuratorFramework curator;
    private ServiceDiscovery<CoordinationProtos.NodeEndpoint> discovery;
    private ZKClusterConfig config;
    private final ZookeeperFactory zkFactory;
    private final String connect;
    private String connectionString;
    private Provider<Integer> localPortProvider;
    private volatile boolean closed = false;
    private final CoordinatorLostHandle connectionLostHandler;
    private Boolean isConnected;
    private final String executorZkSupervisor = "coordinator-zk-supervisor-";
    private final ScheduledExecutorService scheduleExecutorService = Executors.newScheduledThreadPool(1, new NamedThreadFactory("coordinator-zk-supervisor-"));
    private final ExecutorService executorServiceSupervisor = Executors.newSingleThreadExecutor();
    private int currentNumberOfSupervisorProbeFailures = 0;
    private final String clusterIdPath;
    private final Predicate<TypeValidators.BooleanValidator> featureEvaluator;

    public ZKClusterClient(ZKClusterConfig config, String connect) throws IOException {
        this(config, connect, null, new ZKClientFactory());
    }

    public ZKClusterClient(ZKClusterConfig config, String connect, ZookeeperFactory zkFactory) throws IOException {
        this(config, connect, null, zkFactory);
    }

    public ZKClusterClient(ZKClusterConfig config, Provider<Integer> localPort) throws IOException {
        this(config, null, localPort, new ZKClientFactory());
    }

    private ZKClusterClient(ZKClusterConfig config, String connect, Provider<Integer> localPort, ZookeeperFactory zkFactory) throws IOException {
        Matcher m4;
        this.featureEvaluator = config.getFeatureEvaluator();
        this.localPortProvider = localPort;
        this.connect = connect;
        this.config = config;
        this.zkFactory = zkFactory;
        String clusterId = config.getClusterId();
        if (connect != null && (m4 = ZK_COMPLEX_STRING.matcher(connect)).matches()) {
            clusterId = m4.group(3);
        }
        this.clusterId = clusterId;
        this.connectionLostHandler = config.isConnectionHandleEnabled() ? config.getConnectionLostHandler() : ObservableConnectionLostHandler.OBSERVABLE_LOST_HANDLER.get();
        this.clusterIdPath = "/" + this.clusterId;
    }

    @Override
    public void start() throws Exception {
        Matcher m4;
        this.connectionString = this.localPortProvider != null ? "localhost:" + String.valueOf(this.localPortProvider.get()) : (this.connect == null || this.connect.isEmpty() ? this.config.getConnection() : this.connect);
        String zkRoot = this.config.getRoot();
        if (this.connectionString != null && (m4 = ZK_COMPLEX_STRING.matcher(this.connectionString)).matches()) {
            this.connectionString = m4.group(1);
            zkRoot = m4.group(2);
        }
        logger.info("Connect: {}, zkRoot: {}, clusterId: {}", this.connectionString, zkRoot, this.clusterId);
        BoundedExponentialDelay rp = new BoundedExponentialDelay(this.config.getRetryBaseDelayMilliSecs(), this.config.getRetryMaxDelayMilliSecs(), this.config.isRetryUnlimited(), this.config.getRetryLimit());
        this.curator = CuratorFrameworkFactory.builder().namespace(zkRoot).connectionTimeoutMs(this.config.getConnectionTimeoutMilliSecs()).sessionTimeoutMs(this.config.getSessionTimeoutMilliSecs()).maxCloseWaitMs(this.config.getRetryMaxDelayMilliSecs()).retryPolicy(rp).connectString(this.connectionString).zookeeperFactory(this.zkFactory).zookeeperCompatibility(ZK_35_COMPATIBILITY).build();
        this.curator.getConnectionStateListenable().addListener(new InitialConnectionListener());
        this.curator.getConnectionStateListenable().addListener(new ConnectionListener());
        this.curator.start();
        this.discovery = this.newDiscovery(this.clusterIdPath);
        logger.info("Starting ZKClusterClient, ZK_TIMEOUT:{}, ZK_SESSION_TIMEOUT:{}, ZK_RETRY_MAX_DELAY:{}, ZK_RETRY_UNLIMITED:{}, ZK_RETRY_LIMIT:{}, CONNECTION_HANDLE_ENABLED:{}, SUPERVISOR_INTERVAL:{}, SUPERVISOR_READ_TIMEOUT:{}, SUPERVISOR_MAX_FAILURES:{}", this.config.getConnectionTimeoutMilliSecs(), this.config.getSessionTimeoutMilliSecs(), this.config.getRetryMaxDelayMilliSecs(), this.config.isRetryUnlimited(), this.config.getRetryLimit(), this.config.isConnectionHandleEnabled(), this.config.getZkSupervisorIntervalMilliSec(), this.config.getZkSupervisorReadTimeoutMilliSec(), this.config.getZkSupervisorMaxFailures());
        this.discovery.start();
        this.scheduleExecutorService.scheduleAtFixedRate(() -> {
            try {
                this.runSupervisorCheck();
            }
            catch (Exception e) {
                logger.error("Error in : " + e.getMessage(), e);
            }
        }, this.config.getZkSupervisorIntervalMilliSec(), this.config.getZkSupervisorIntervalMilliSec(), TimeUnit.MILLISECONDS);
        if (!this.config.isRetryUnlimited() && !this.initialConnection.await(this.config.getInitialTimeoutMilliSecs(), TimeUnit.MILLISECONDS)) {
            logger.info("Failed to get initial connection to ZK");
            this.connectionLostHandler.handleConnectionState(ConnectionState.LOST);
        } else {
            this.initialConnection.await();
        }
    }

    private void runSupervisorCheck() {
        if (this.featureEvaluator != null && this.featureEvaluator.test(ZKClusterConfig.COORDINATOR_ZK_SUPERVISOR)) {
            boolean isProbeSucceeded = false;
            if (this.isConnected == null || !this.isConnected.booleanValue()) {
                logger.error("ZKClusterClient: Not connected to ZK.");
            } else {
                try {
                    FutureTask<Boolean> checkGetServiceNames = new FutureTask<Boolean>(() -> this.getServiceNames() != null);
                    this.executorServiceSupervisor.execute(checkGetServiceNames);
                    if (!((Boolean)checkGetServiceNames.get(this.config.getZkSupervisorReadTimeoutMilliSec(), TimeUnit.MILLISECONDS)).booleanValue()) {
                        logger.error("ZKClusterClient: cluster id path {} not found", (Object)this.clusterIdPath);
                    } else {
                        isProbeSucceeded = true;
                        this.currentNumberOfSupervisorProbeFailures = 0;
                        logger.debug("ZKClusterClient: probe ok to cluster id path {}", (Object)this.clusterIdPath);
                    }
                }
                catch (TimeoutException e) {
                    logger.error("ZKClusterClient: Timeout while trying to check for zk cluster id path {} ", (Object)this.clusterIdPath, (Object)e);
                }
                catch (Exception e) {
                    logger.error("ZKClusterClient: Exception while trying to check for zk cluster id path {} ", (Object)this.clusterIdPath, (Object)e);
                }
            }
            if (!isProbeSucceeded) {
                ZK_SUPERVISOR_FAILED_COUNTER.increment();
                ++this.currentNumberOfSupervisorProbeFailures;
                if (this.currentNumberOfSupervisorProbeFailures >= this.config.getZkSupervisorMaxFailures()) {
                    ZK_SUPERVISOR_EXIT_APP_COUNTER.increment();
                    logger.error("ZKClusterClient: max number of failures has reached [{}/{}]. Calling probe action for out of service", (Object)this.currentNumberOfSupervisorProbeFailures, (Object)this.config.getZkSupervisorMaxFailures());
                    Runtime.getRuntime().halt(1);
                } else {
                    logger.warn("ZKClusterClient: probe failed. Attempt[{}] of max[{}]. Next in {} msec", this.currentNumberOfSupervisorProbeFailures, this.config.getZkSupervisorMaxFailures(), this.config.getZkSupervisorIntervalMilliSec());
                }
            }
        }
    }

    @VisibleForTesting
    ZooKeeper getZooKeeperClient() throws Exception {
        return this.curator.getZookeeperClient().getZooKeeper();
    }

    @VisibleForTesting
    String getConnectionString() {
        return this.connectionString;
    }

    @VisibleForTesting
    public void setPortProvider(Provider<Integer> portProvider) {
        this.localPortProvider = portProvider;
    }

    @Override
    public void close() throws Exception {
        if (!this.closed) {
            this.closed = true;
            logger.info("Stopping ZKClusterClient");
            this.initialConnection.countDown();
            AutoCloseables.close(this.discovery, this.curator, CloseableSchedulerThreadPool.of(this.executorService, logger));
            CloseableSchedulerThreadPool.close(this.scheduleExecutorService, logger);
            CloseableSchedulerThreadPool.close(this.executorServiceSupervisor, logger);
            logger.info("Stopped ZKClusterClient");
        }
    }

    public DistributedSemaphore getSemaphore(String name, int maximumLeases) {
        try {
            return new ZkDistributedSemaphore(this.curator, this.clusterIdPath + "/semaphore/" + name, maximumLeases);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public LinearizableHierarchicalStore getHierarchicalStore() {
        ZKLinearizableStore store = new ZKLinearizableStore(this.curator, this.getRootLatchPath());
        if (this.connectionLostHandler instanceof ObservableConnectionLostHandler) {
            logger.info("Attaching connection lost observer");
            ((ObservableConnectionLostHandler)this.connectionLostHandler).attachObserver(store);
        } else {
            logger.info("Connection handle is {}", (Object)this.connectionLostHandler.getClass().getSimpleName());
        }
        return store;
    }

    public Iterable<String> getServiceNames() throws Exception {
        return (Iterable)this.curator.getChildren().forPath(this.clusterIdPath);
    }

    public ElectionRegistrationHandle joinElection(final String name, final ElectionListener listener) {
        final String id = UUID.randomUUID().toString();
        final String latchPath = this.getRootLatchPath() + name;
        final LeaderLatch leaderLatch = new LeaderLatch(this.curator, latchPath, id, LeaderLatch.CloseMode.SILENT);
        logger.info("joinElection called {} - {}.", (Object)id, (Object)name);
        final AtomicReference newLeaderRef = new AtomicReference();
        final AtomicLong leaderElectedGeneration = new AtomicLong(0L);
        leaderLatch.addListener(new LeaderLatchListener(){
            private final long electionTimeoutMs;
            private final long electionPollingMs;
            private final long delayForLeaderCallbackMs;
            {
                this.electionTimeoutMs = ZKClusterClient.this.config.getElectionTimeoutMilliSecs();
                this.electionPollingMs = ZKClusterClient.this.config.getElectionPollingMilliSecs();
                this.delayForLeaderCallbackMs = ZKClusterClient.this.config.getElectionDelayForLeaderCallbackMilliSecs();
            }

            @Override
            public void notLeader() {
                logger.info("Lost latch {} for election {}.", (Object)id, (Object)name);
                if (leaderLatch.getState() == LeaderLatch.State.CLOSED) {
                    listener.onCancelled();
                    return;
                }
                if (listener instanceof ZKElectionListener) {
                    ((ZKElectionListener)listener).onConnectionLoss();
                }
                final long savedLeaderElectedGeneration = leaderElectedGeneration.get();
                final Future newLeader = ZKClusterClient.this.executorService.submit(new Callable<Void>(){

                    @Override
                    public Void call() throws Exception {
                        while (true) {
                            if (leaderLatch.hasLeadership()) {
                                if (listener instanceof ZKElectionListener) {
                                    ((ZKElectionListener)listener).onReconnection();
                                }
                                TimeUnit.MILLISECONDS.sleep(delayForLeaderCallbackMs);
                                return null;
                            }
                            Participant participant = leaderLatch.getLeader();
                            if (listener instanceof ZKElectionListener) {
                                ((ZKElectionListener)listener).onReconnection();
                            }
                            if (participant.isLeader()) {
                                TimeUnit.MILLISECONDS.sleep(delayForLeaderCallbackMs);
                                return null;
                            }
                            TimeUnit.MILLISECONDS.sleep(electionPollingMs);
                        }
                    }
                });
                Future newLeaderWithTimeout = ZKClusterClient.this.executorService.submit(new Callable<Void>(){

                    @Override
                    public Void call() throws Exception {
                        try {
                            return (Void)newLeader.get(electionTimeoutMs, TimeUnit.MILLISECONDS);
                        }
                        catch (TimeoutException e) {
                            logger.info("Not able to get election status in {}ms for {}. Cancelling election...", (Object)electionTimeoutMs, (Object)name);
                            newLeader.cancel(true);
                            throw e;
                        }
                    }
                });
                Futures.addCallback(newLeaderWithTimeout, new FutureCallback<Void>(){

                    @Override
                    public void onSuccess(Void v) {
                        this.checkAndNotifyCancelled(savedLeaderElectedGeneration);
                    }

                    @Override
                    public void onFailure(Throwable t2) {
                        if (t2 instanceof CancellationException) {
                            return;
                        }
                        this.checkAndNotifyCancelled(savedLeaderElectedGeneration);
                    }
                }, MoreExecutors.directExecutor());
                newLeaderRef.set(newLeaderWithTimeout);
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void isLeader() {
                if (listener instanceof ZKElectionListener) {
                    ((ZKElectionListener)listener).onBeginIsLeader();
                }
                logger.info("Acquired latch {} for election {}.", (Object)id, (Object)name);
                ListenableFuture newLeader = newLeaderRef.getAndSet(null);
                if (newLeader != null) {
                    newLeader.cancel(false);
                }
                1 var2_2 = this;
                synchronized (var2_2) {
                    leaderElectedGeneration.getAndIncrement();
                    listener.onElected();
                }
            }

            private synchronized void checkAndNotifyCancelled(long svdLeaderGeneration) {
                if (leaderElectedGeneration.get() == svdLeaderGeneration) {
                    logger.info("New leader elected. Invoke cancel on listener");
                    listener.onCancelled();
                }
            }
        });
        try {
            leaderLatch.start();
        }
        catch (Exception e) {
            throw Throwables.propagate(e);
        }
        return new ElectionRegistrationHandle(){

            @Override
            public void close() {
                try {
                    leaderLatch.close();
                    this.deleteServiceLeaderElectionPath();
                }
                catch (IOException e) {
                    logger.error("Error when closing registration handle for election {}", (Object)name, (Object)e);
                }
            }

            @Override
            public Object synchronizer() {
                return leaderLatch;
            }

            @Override
            public int instanceCount() {
                try {
                    return leaderLatch.getParticipants().size();
                }
                catch (Exception e) {
                    logger.error("Unable to get leader latch participants count for {}", (Object)name, (Object)e);
                    return 0;
                }
            }

            private void deleteServiceLeaderElectionPath() {
                try {
                    boolean isZkConnected;
                    boolean bl = isZkConnected = ZKClusterClient.this.isConnected != null && ZKClusterClient.this.isConnected.equals(true);
                    if (isZkConnected && ZKClusterClient.this.curator.checkExists().forPath(latchPath) != null) {
                        List allChildren = (List)ZKClusterClient.this.curator.getChildren().forPath(latchPath);
                        if (allChildren.isEmpty()) {
                            ((ChildrenDeletable)ZKClusterClient.this.curator.delete().guaranteed()).forPath(latchPath);
                            logger.info("Closed leader latch. Deleted latch path {}", (Object)latchPath);
                        } else {
                            logger.info("Closed leader latch. Nothing to do about latch path {}. It has children: {}", (Object)latchPath, (Object)allChildren.size());
                        }
                    } else if (!isZkConnected) {
                        logger.warn("Closed leader latch. Nothing to do about latch path {}. Not connected to ZK", (Object)latchPath);
                    }
                }
                catch (Exception e) {
                    logger.warn("Could not delete latch path {}", (Object)latchPath, (Object)e);
                }
            }
        };
    }

    private String getRootLatchPath() {
        return this.clusterIdPath + "/leader-latch/";
    }

    public ZKServiceSet newServiceSet(String name) {
        return new ZKServiceSet(name, this.discovery);
    }

    public void deleteServiceSetZkNode(String name) {
        String zkNodePath = this.clusterIdPath + "/" + name;
        try {
            boolean isZkConnected;
            boolean bl = isZkConnected = this.isConnected != null && this.isConnected.equals(true);
            if (isZkConnected && this.curator.checkExists().forPath(zkNodePath) != null) {
                List allChildren = (List)this.curator.getChildren().forPath(zkNodePath);
                if (allChildren.isEmpty()) {
                    ((ChildrenDeletable)this.curator.delete().guaranteed()).forPath(zkNodePath);
                    logger.info("Deleted ZKServiceSet zk node path {}", (Object)zkNodePath);
                } else {
                    logger.info("Deleted ZKServiceSet. Nothing to do about zk node path {}. It has children: {}", (Object)zkNodePath, (Object)allChildren.size());
                }
            } else if (!isZkConnected) {
                logger.warn("Deleted ZKServiceSet. Nothing to do about zk node path {}. Not connected to ZK", (Object)zkNodePath);
            }
        }
        catch (Exception e) {
            logger.warn("Deleted ZKServiceSet - Could not delete zk node path {}", (Object)zkNodePath, (Object)e);
        }
    }

    private ServiceDiscovery<CoordinationProtos.NodeEndpoint> newDiscovery(String clusterId) {
        return ServiceDiscoveryBuilder.builder(CoordinationProtos.NodeEndpoint.class).basePath(clusterId).client(this.curator).serializer(ServiceInstanceHelper.SERIALIZER).build();
    }

    @ThreadSafe
    static class ZKClientFactory
    implements ZookeeperFactory {
        private ZooKeeper client;
        private String connectString;
        private int sessionTimeout;
        private boolean canBeReadOnly;

        ZKClientFactory() {
        }

        @Override
        public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception {
            Preconditions.checkNotNull(connectString);
            Preconditions.checkArgument(sessionTimeout > 0, "sessionTimeout should be a positive integer");
            if (this.client == null) {
                this.connectString = connectString;
                this.sessionTimeout = sessionTimeout;
                this.canBeReadOnly = canBeReadOnly;
            }
            logger.info("Creating new Zookeeper client with arguments: {}, {}, {}.", this.connectString, this.sessionTimeout, this.canBeReadOnly);
            this.client = new ZooKeeper(this.connectString, this.sessionTimeout, watcher, this.canBeReadOnly);
            return this.client;
        }
    }

    private final class InitialConnectionListener
    implements ConnectionStateListener {
        private InitialConnectionListener() {
        }

        @Override
        public void stateChanged(CuratorFramework client, ConnectionState newState) {
            if (newState == ConnectionState.CONNECTED) {
                ZKClusterClient.this.initialConnection.countDown();
                client.getConnectionStateListenable().removeListener(this);
            }
        }
    }

    private final class ConnectionListener
    implements ConnectionStateListener {
        private ConnectionListener() {
        }

        @Override
        public void stateChanged(CuratorFramework client, ConnectionState newState) {
            ZKClusterClient.this.isConnected = newState.isConnected();
            logger.info("ZKClusterClient: new state received[{}] - isConnected: {}", (Object)newState, (Object)ZKClusterClient.this.isConnected);
            if (ZKClusterClient.this.isConnected.booleanValue()) {
                ZK_RECONNECTED_COUNTER.increment();
            } else if (ConnectionState.LOST.equals((Object)newState)) {
                ZK_SESSION_LOST_COUNTER.increment();
            } else {
                ZK_SESSION_SUSPENDED_COUNTER.increment();
            }
            ZKClusterClient.this.connectionLostHandler.handleConnectionState(newState);
        }
    }
}

