/*
 * Decompiled with CFR 0.152.
 */
package com.navercorp.pinpoint.profiler.receiver.service;

import com.navercorp.pinpoint.profiler.context.active.ActiveTraceHistogram;
import com.navercorp.pinpoint.profiler.context.active.ActiveTraceHistogramUtils;
import com.navercorp.pinpoint.profiler.context.active.ActiveTraceRepository;
import com.navercorp.pinpoint.profiler.receiver.CommandSerializer;
import com.navercorp.pinpoint.profiler.receiver.ProfilerRequestCommandService;
import com.navercorp.pinpoint.profiler.receiver.ProfilerStreamCommandService;
import com.navercorp.pinpoint.rpc.packet.stream.StreamCode;
import com.navercorp.pinpoint.rpc.stream.ServerStreamChannel;
import com.navercorp.pinpoint.rpc.stream.ServerStreamChannelContext;
import com.navercorp.pinpoint.rpc.stream.StreamChannelStateChangeEventHandler;
import com.navercorp.pinpoint.rpc.stream.StreamChannelStateCode;
import com.navercorp.pinpoint.thrift.dto.command.TCmdActiveThreadCountRes;
import com.navercorp.pinpoint.thrift.io.TCommandType;
import com.navercorp.pinpoint.thrift.util.SerializationUtils;
import java.io.Closeable;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.thrift.TBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ActiveThreadCountService
implements ProfilerRequestCommandService<TBase<?, ?>, TBase<?, ?>>,
ProfilerStreamCommandService<TBase<?, ?>>,
Closeable {
    private static final long DEFAULT_FLUSH_DELAY = 1000L;
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    private final boolean isDebug = this.logger.isDebugEnabled();
    private final StreamChannelStateChangeEventHandler stateChangeEventHandler = new ActiveThreadCountStreamChannelStateChangeEventHandler();
    private final Timer timer = new Timer("Pinpoint-ActiveThreadCountService-Timer", true);
    private final long flushDelay;
    private final List<ServerStreamChannel> streamChannelRepository = new CopyOnWriteArrayList<ServerStreamChannel>();
    private final ActiveTraceRepository activeTraceRepository;

    public ActiveThreadCountService(ActiveTraceRepository activeTraceRepository) {
        this(activeTraceRepository, 1000L);
    }

    public ActiveThreadCountService(ActiveTraceRepository activeTraceRepository, long flushDelay) {
        if (activeTraceRepository == null) {
            throw new NullPointerException("activeTraceRepository");
        }
        this.activeTraceRepository = activeTraceRepository;
        this.flushDelay = flushDelay;
    }

    @Override
    public short getCommandServiceCode() {
        return TCommandType.ACTIVE_THREAD_COUNT.getCode();
    }

    @Override
    public TBase<?, ?> requestCommandService(TBase<?, ?> activeThreadCountObject) {
        if (activeThreadCountObject == null) {
            throw new NullPointerException("activeThreadCountObject must not be null.");
        }
        return this.getActiveThreadCountResponse();
    }

    @Override
    public StreamCode streamCommandService(TBase<?, ?> tBase, ServerStreamChannelContext streamChannelContext) {
        this.logger.info("streamCommandService object:{}, streamChannelContext:{}", tBase, (Object)streamChannelContext);
        streamChannelContext.getStreamChannel().addStateChangeEventHandler(this.stateChangeEventHandler);
        return StreamCode.OK;
    }

    private TCmdActiveThreadCountRes getActiveThreadCountResponse() {
        long currentTime = System.currentTimeMillis();
        ActiveTraceHistogram histogram = this.activeTraceRepository.getActiveTraceHistogram(currentTime);
        TCmdActiveThreadCountRes response = new TCmdActiveThreadCountRes();
        response.setHistogramSchemaType(histogram.getHistogramSchema().getTypeCode());
        List<Integer> activeTraceCounts = ActiveTraceHistogramUtils.asList(histogram);
        response.setActiveThreadCount(activeTraceCounts);
        response.setTimeStamp(currentTime);
        return response;
    }

    @Override
    public void close() {
        if (this.timer != null) {
            this.timer.cancel();
        }
    }

    private class ActiveThreadCountTimerTask
    extends TimerTask {
        private ActiveThreadCountTimerTask() {
        }

        @Override
        public void run() {
            if (ActiveThreadCountService.this.isDebug) {
                ActiveThreadCountService.this.logger.debug("ActiveThreadCountTimerTask started. target-streams:{}", (Object)ActiveThreadCountService.this.streamChannelRepository);
            }
            try {
                TCmdActiveThreadCountRes activeThreadCountResponse = ActiveThreadCountService.this.getActiveThreadCountResponse();
                for (ServerStreamChannel serverStreamChannel : ActiveThreadCountService.this.streamChannelRepository) {
                    byte[] payload = SerializationUtils.serialize((TBase)activeThreadCountResponse, CommandSerializer.SERIALIZER_FACTORY, null);
                    if (payload == null) continue;
                    serverStreamChannel.sendData(payload);
                }
            }
            catch (Exception e) {
                ActiveThreadCountService.this.logger.warn("failed to execute ActiveThreadCountTimerTask.run method. message:{}", (Object)e.getMessage(), (Object)e);
            }
        }
    }

    private class ActiveThreadCountStreamChannelStateChangeEventHandler
    implements StreamChannelStateChangeEventHandler<ServerStreamChannel> {
        private final Object lock = new Object();
        private final AtomicReference<ActiveThreadCountTimerTask> currentTaskReference = new AtomicReference();

        private ActiveThreadCountStreamChannelStateChangeEventHandler() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void eventPerformed(ServerStreamChannel streamChannel, StreamChannelStateCode updatedStateCode) throws Exception {
            ActiveThreadCountService.this.logger.info("eventPerformed. ServerStreamChannel:{}, StreamChannelStateCode:{}.", (Object)streamChannel, (Object)updatedStateCode);
            Object object = this.lock;
            synchronized (object) {
                switch (updatedStateCode) {
                    case CONNECTED: {
                        ActiveThreadCountService.this.streamChannelRepository.add(streamChannel);
                        ActiveThreadCountTimerTask activeThreadCountTimerTask = new ActiveThreadCountTimerTask();
                        boolean turnOn = this.currentTaskReference.compareAndSet(null, activeThreadCountTimerTask);
                        if (!turnOn) break;
                        ActiveThreadCountService.this.logger.info("turn on ActiveThreadCountTimerTask.");
                        ActiveThreadCountService.this.timer.scheduleAtFixedRate((TimerTask)activeThreadCountTimerTask, ActiveThreadCountService.this.flushDelay, ActiveThreadCountService.this.flushDelay);
                        break;
                    }
                    case CLOSED: 
                    case ILLEGAL_STATE: {
                        ActiveThreadCountTimerTask currentTask;
                        boolean removed = ActiveThreadCountService.this.streamChannelRepository.remove(streamChannel);
                        if (!removed || !ActiveThreadCountService.this.streamChannelRepository.isEmpty() || (currentTask = this.currentTaskReference.get()) == null) break;
                        this.currentTaskReference.compareAndSet(currentTask, null);
                        currentTask.cancel();
                        ActiveThreadCountService.this.logger.info("turn off ActiveThreadCountTimerTask.");
                    }
                }
            }
        }

        public void exceptionCaught(ServerStreamChannel streamChannel, StreamChannelStateCode updatedStateCode, Throwable e) {
            ActiveThreadCountService.this.logger.warn("exceptionCaught caused:{}. ServerStreamChannel:{}, StreamChannelStateCode:{}.", new Object[]{e.getMessage(), streamChannel, updatedStateCode, e});
        }
    }
}

