/*
 * Decompiled with CFR 0.152.
 */
package com.dremio.jdbc.shaded.com.dremio.service.coordinator.zk;

import com.dremio.jdbc.shaded.com.dremio.exec.proto.CoordinationProtos;
import com.dremio.jdbc.shaded.com.dremio.service.Service;
import com.dremio.jdbc.shaded.com.dremio.service.coordinator.AbstractServiceSet;
import com.dremio.jdbc.shaded.com.dremio.service.coordinator.RegistrationHandle;
import com.dremio.jdbc.shaded.com.dremio.service.coordinator.zk.ZKClusterCoordinator;
import com.dremio.jdbc.shaded.com.dremio.service.coordinator.zk.ZKRegistrationHandle;
import com.dremio.jdbc.shaded.com.google.common.base.Function;
import com.dremio.jdbc.shaded.com.google.common.base.Throwables;
import com.dremio.jdbc.shaded.com.google.common.collect.Collections2;
import com.dremio.jdbc.shaded.org.apache.curator.framework.CuratorFramework;
import com.dremio.jdbc.shaded.org.apache.curator.framework.state.ConnectionState;
import com.dremio.jdbc.shaded.org.apache.curator.x.discovery.ServiceCache;
import com.dremio.jdbc.shaded.org.apache.curator.x.discovery.ServiceDiscovery;
import com.dremio.jdbc.shaded.org.apache.curator.x.discovery.ServiceInstance;
import com.dremio.jdbc.shaded.org.apache.curator.x.discovery.details.ServiceCacheListener;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;

final class ZKServiceSet
extends AbstractServiceSet
implements Service {
    private final String serviceName;
    private final ServiceDiscovery<CoordinationProtos.NodeEndpoint> discovery;
    private final ServiceCache<CoordinationProtos.NodeEndpoint> serviceCache;
    private volatile Collection<CoordinationProtos.NodeEndpoint> endpoints = Collections.emptyList();

    public ZKServiceSet(String serviceName, ServiceDiscovery<CoordinationProtos.NodeEndpoint> discovery) {
        this.serviceName = serviceName;
        this.discovery = discovery;
        this.serviceCache = discovery.serviceCacheBuilder().name(serviceName).build();
    }

    @Override
    public void start() throws Exception {
        this.serviceCache.start();
        this.serviceCache.addListener((CoordinationProtos.NodeEndpoint)((Object)new EndpointListener()));
        this.updateEndpoints();
    }

    @Override
    public RegistrationHandle register(CoordinationProtos.NodeEndpoint endpoint) {
        try {
            ServiceInstance<CoordinationProtos.NodeEndpoint> serviceInstance = this.newServiceInstance(this.serviceName, endpoint);
            this.discovery.registerService(serviceInstance);
            return new ZKRegistrationHandle(this, serviceInstance);
        }
        catch (Exception e) {
            throw Throwables.propagate(e);
        }
    }

    void unregister(ZKRegistrationHandle handle) {
        try {
            this.discovery.unregisterService(handle.getServiceInstance());
        }
        catch (Exception e) {
            throw Throwables.propagate(e);
        }
    }

    private synchronized void updateEndpoints() {
        try {
            Collection<CoordinationProtos.NodeEndpoint> newNodesSet = this.getAvailableEndpoints();
            HashSet<CoordinationProtos.NodeEndpoint> unregisteredNodes = new HashSet<CoordinationProtos.NodeEndpoint>(this.endpoints);
            unregisteredNodes.removeAll(newNodesSet);
            HashSet<CoordinationProtos.NodeEndpoint> newlyRegisteredNodes = new HashSet<CoordinationProtos.NodeEndpoint>(newNodesSet);
            newlyRegisteredNodes.removeAll(this.endpoints);
            this.endpoints = newNodesSet;
            if (ZKClusterCoordinator.logger.isDebugEnabled()) {
                StringBuilder builder = new StringBuilder();
                builder.append("Active nodes set changed.  Now includes ");
                builder.append(newNodesSet.size());
                builder.append(" total nodes.  New active nodes: \n");
                for (CoordinationProtos.NodeEndpoint bit : newNodesSet) {
                    builder.append('\t');
                    builder.append(bit.getAddress());
                    builder.append(':');
                    builder.append(bit.getUserPort());
                    builder.append(':');
                    builder.append(bit.getFabricPort());
                    builder.append(':');
                    builder.append(bit.getStartTime());
                    builder.append('\n');
                }
                ZKClusterCoordinator.logger.debug(builder.toString());
            }
            if (!unregisteredNodes.isEmpty()) {
                this.nodesUnregistered(unregisteredNodes);
            }
            if (!newlyRegisteredNodes.isEmpty()) {
                this.nodesRegistered(newlyRegisteredNodes);
            }
        }
        catch (Exception e) {
            ZKClusterCoordinator.logger.error("Failure while update SabotNode service location cache.", e);
        }
    }

    private ServiceInstance<CoordinationProtos.NodeEndpoint> newServiceInstance(String name, CoordinationProtos.NodeEndpoint endpoint) throws Exception {
        return ServiceInstance.builder().name(name).payload(endpoint).build();
    }

    @Override
    public Collection<CoordinationProtos.NodeEndpoint> getAvailableEndpoints() {
        return Collections2.transform(this.serviceCache.getInstances(), new Function<ServiceInstance<CoordinationProtos.NodeEndpoint>, CoordinationProtos.NodeEndpoint>(){

            @Override
            public CoordinationProtos.NodeEndpoint apply(ServiceInstance<CoordinationProtos.NodeEndpoint> input) {
                return input.getPayload();
            }
        });
    }

    @Override
    public void close() throws Exception {
        this.clearListeners();
        this.serviceCache.close();
    }

    private final class EndpointListener
    implements ServiceCacheListener {
        private EndpointListener() {
        }

        @Override
        public void stateChanged(CuratorFramework client, ConnectionState newState) {
        }

        @Override
        public void cacheChanged() {
            ZKClusterCoordinator.logger.debug("Got cache changed --> updating endpoints");
            ZKServiceSet.this.updateEndpoints();
        }
    }
}

