/*
 * Decompiled with CFR 0.152.
 */
package com.navercorp.pinpoint.profiler.sender;

import com.navercorp.pinpoint.common.util.Assert;
import com.navercorp.pinpoint.common.util.IOUtils;
import com.navercorp.pinpoint.profiler.context.Span;
import com.navercorp.pinpoint.profiler.context.SpanChunk;
import com.navercorp.pinpoint.profiler.context.thrift.MessageConverter;
import com.navercorp.pinpoint.profiler.sender.AsyncQueueingExecutor;
import com.navercorp.pinpoint.profiler.sender.DataSender;
import com.navercorp.pinpoint.profiler.sender.DefaultAsyncQueueingExecutorListener;
import com.navercorp.pinpoint.profiler.sender.HeaderTBaseSerializerPoolFactory;
import com.navercorp.pinpoint.profiler.sender.PartitionedByteBufferLocator;
import com.navercorp.pinpoint.profiler.sender.SpanStreamSendData;
import com.navercorp.pinpoint.profiler.sender.SpanStreamSendDataFactory;
import com.navercorp.pinpoint.profiler.sender.SpanStreamSendDataMode;
import com.navercorp.pinpoint.profiler.sender.SpanStreamSendDataSerializer;
import com.navercorp.pinpoint.profiler.sender.StandbySpanStreamDataFlushHandler;
import com.navercorp.pinpoint.profiler.sender.StandbySpanStreamDataSendWorker;
import com.navercorp.pinpoint.profiler.sender.StandbySpanStreamDataStorage;
import com.navercorp.pinpoint.profiler.sender.planer.SpanChunkStreamSendDataPlaner;
import com.navercorp.pinpoint.profiler.util.ByteBufferUtils;
import com.navercorp.pinpoint.profiler.util.ObjectPool;
import com.navercorp.pinpoint.thrift.dto.TSpan;
import com.navercorp.pinpoint.thrift.dto.TSpanChunk;
import com.navercorp.pinpoint.thrift.io.HeaderTBaseSerializer;
import java.io.Closeable;
import java.io.IOException;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.util.Iterator;
import org.apache.thrift.TBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SpanStreamUdpSender
implements DataSender {
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
    public static final int SOCKET_TIMEOUT = 5000;
    public static final int SEND_BUFFER_SIZE = 0x100000;
    public static final int DEFAULT_BUFFER_SIZE = 16384;
    public static final int UDP_MAX_PACKET_LENGTH = 65507;
    private final SpanStreamSendDataFactory spanStreamSendDataFactory;
    private final DatagramChannel udpChannel;
    private final AsyncQueueingExecutor<Object> executor;
    private final ObjectPool<HeaderTBaseSerializer> serializerPool;
    private final SpanStreamSendDataSerializer spanStreamSendDataSerializer;
    private final StandbySpanStreamDataSendWorker standbySpanStreamDataSendWorker;
    private final MessageConverter<TBase<?, ?>> messageConverter;

    public SpanStreamUdpSender(String host, int port, String threadName, int queueSize, MessageConverter<TBase<?, ?>> messageConverter) {
        this(host, port, threadName, queueSize, 5000, 0x100000, messageConverter);
    }

    public SpanStreamUdpSender(String host, int port, String threadName, int queueSize, int timeout, int sendBufferSize, MessageConverter<TBase<?, ?>> messageConverter) {
        this(host, port, threadName, queueSize, timeout, sendBufferSize, 16384, messageConverter);
    }

    public SpanStreamUdpSender(String host, int port, String threadName, int queueSize, int timeout, int sendBufferSize, int dataBufferSize, MessageConverter<TBase<?, ?>> messageConverter) {
        if (host == null) {
            throw new NullPointerException("host must not be null");
        }
        if (threadName == null) {
            throw new NullPointerException("threadName must not be null");
        }
        if (queueSize <= 0) {
            throw new IllegalArgumentException("queueSize");
        }
        if (timeout <= 0) {
            throw new IllegalArgumentException("timeout");
        }
        if (sendBufferSize <= 0) {
            throw new IllegalArgumentException("sendBufferSize");
        }
        this.logger.info("UdpDataSender initialized. host={}, port={}", (Object)host, (Object)port);
        this.udpChannel = this.createChannel(host, port, timeout, sendBufferSize);
        HeaderTBaseSerializerPoolFactory headerTBaseSerializerPoolFactory = new HeaderTBaseSerializerPoolFactory(false, dataBufferSize, true);
        this.serializerPool = new ObjectPool<HeaderTBaseSerializer>(headerTBaseSerializerPoolFactory, 16);
        this.spanStreamSendDataSerializer = new SpanStreamSendDataSerializer();
        this.spanStreamSendDataFactory = new SpanStreamSendDataFactory(dataBufferSize, 16, this.serializerPool);
        this.standbySpanStreamDataSendWorker = new StandbySpanStreamDataSendWorker(new FlushHandler(), new StandbySpanStreamDataStorage());
        this.standbySpanStreamDataSendWorker.start();
        this.executor = this.createAsyncQueueingExecutor(queueSize, threadName);
        this.messageConverter = (MessageConverter)Assert.requireNonNull(messageConverter, (String)"messageConverter must not be null");
    }

    private AsyncQueueingExecutor<Object> createAsyncQueueingExecutor(int queueSize, String executorName) {
        DefaultAsyncQueueingExecutorListener listener = new DefaultAsyncQueueingExecutorListener(){

            @Override
            public void execute(Object message) {
                SpanStreamUdpSender.this.sendPacket(message);
            }
        };
        AsyncQueueingExecutor<Object> executor = new AsyncQueueingExecutor<Object>(queueSize, executorName, listener);
        return executor;
    }

    private DatagramChannel createChannel(String host, int port, int timeout, int sendBufferSize) {
        DatagramChannel datagramChannel = null;
        DatagramSocket socket = null;
        try {
            int checkSendBufferSize;
            datagramChannel = DatagramChannel.open();
            socket = datagramChannel.socket();
            socket.setSoTimeout(timeout);
            socket.setSendBufferSize(sendBufferSize);
            if (this.logger.isWarnEnabled() && sendBufferSize != (checkSendBufferSize = socket.getSendBufferSize())) {
                this.logger.warn("DatagramChannel.setSendBufferSize() error. {}!={}", (Object)sendBufferSize, (Object)checkSendBufferSize);
            }
            InetSocketAddress serverAddress = new InetSocketAddress(host, port);
            datagramChannel.connect(serverAddress);
            return datagramChannel;
        }
        catch (IOException e) {
            IOUtils.closeQuietly(socket);
            IOUtils.closeQuietly((Closeable)datagramChannel);
            throw new IllegalStateException("DatagramChannel create fail. Cause" + e.getMessage(), e);
        }
    }

    public boolean send(Object data) {
        return this.executor.execute(data);
    }

    @Override
    public void stop() {
        try {
            this.standbySpanStreamDataSendWorker.stop();
        }
        catch (Exception e) {
            this.logger.debug("Failed to stop standbySpanStreamDataSendWorker.", (Throwable)e);
        }
        try {
            this.udpChannel.close();
        }
        catch (IOException e) {
            this.logger.debug("Failed to close udp channel.", (Throwable)e);
        }
        this.executor.stop();
    }

    private void sendPacket(Object message) {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("sendPacket message:{}", message);
        }
        if (message instanceof Span) {
            this.handleSpan((Span)message);
        } else if (message instanceof SpanChunk) {
            this.handleSpanChunk((SpanChunk)message);
        } else {
            this.logger.info("sendPacket fail. invalid type:{}", (Object)this.messageToString(message));
        }
    }

    private String messageToString(Object message) {
        if (message == null) {
            return null;
        }
        return message.getClass().toString();
    }

    private void handleSpan(Span span) {
        TBase<?, ?> message;
        TSpan tSpan;
        if (span == null) {
            return;
        }
        HeaderTBaseSerializer serializer = this.serializerPool.getObject();
        PartitionedByteBufferLocator partitionedByteBufferLocator = this.spanStreamSendDataSerializer.serializeSpanStream(serializer, tSpan = (TSpan)(message = this.messageConverter.toMessage(span)));
        if (partitionedByteBufferLocator == null) {
            this.serializerPool.returnObject(serializer);
            return;
        }
        this.doAddAndFlush(partitionedByteBufferLocator, serializer);
    }

    private void handleSpanChunk(SpanChunk spanChunk) {
        TBase<?, ?> message;
        TSpanChunk tSpanChunk;
        if (spanChunk == null) {
            return;
        }
        HeaderTBaseSerializer serializer = this.serializerPool.getObject();
        PartitionedByteBufferLocator partitionedByteBufferLocator = this.spanStreamSendDataSerializer.serializeSpanChunkStream(serializer, tSpanChunk = (TSpanChunk)(message = this.messageConverter.toMessage(spanChunk)));
        if (partitionedByteBufferLocator == null) {
            this.serializerPool.returnObject(serializer);
            return;
        }
        this.doAddAndFlush(partitionedByteBufferLocator, serializer);
    }

    private void doAddAndFlush(PartitionedByteBufferLocator partitionedByteBufferLocator, HeaderTBaseSerializer serializer) {
        this.logger.debug("PartitionedByteBufferLocator {}.", (Object)partitionedByteBufferLocator);
        SpanStreamSendData currentSpanStreamSendData = this.standbySpanStreamDataSendWorker.getStandbySpanStreamSendData();
        if (currentSpanStreamSendData == null) {
            currentSpanStreamSendData = this.spanStreamSendDataFactory.create();
        }
        try {
            if (!currentSpanStreamSendData.addBuffer(partitionedByteBufferLocator.getByteBuffer())) {
                SpanChunkStreamSendDataPlaner sendDataPlaner = new SpanChunkStreamSendDataPlaner(partitionedByteBufferLocator, this.spanStreamSendDataFactory);
                Iterator<SpanStreamSendData> sendDataIterator = sendDataPlaner.getSendDataIterator(currentSpanStreamSendData, serializer);
                while (sendDataIterator.hasNext()) {
                    boolean isAdded;
                    SpanStreamSendData sendData = sendDataIterator.next();
                    if (sendData.getFlushMode() == SpanStreamSendDataMode.FLUSH) {
                        this.flush(sendData);
                        continue;
                    }
                    if (sendData.getFlushMode() != SpanStreamSendDataMode.WAIT_BUFFER || (isAdded = this.standbySpanStreamDataSendWorker.addStandbySpanStreamData(sendData))) continue;
                    this.flush(sendData);
                }
            } else {
                boolean isAdded = this.standbySpanStreamDataSendWorker.addStandbySpanStreamData(currentSpanStreamSendData);
                if (!isAdded) {
                    this.flush(currentSpanStreamSendData);
                }
            }
        }
        catch (IOException e) {
            this.logger.warn("UDPChannel write fail.", (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void flush(SpanStreamSendData spanStreamSendData) throws IOException {
        if (spanStreamSendData == null) {
            return;
        }
        ByteBuffer[] byteBuffers = spanStreamSendData.getSendBuffers();
        int remainingLength = ByteBufferUtils.getRemaining(byteBuffers);
        try {
            if (remainingLength != 0) {
                long sentBufferSize = this.udpChannel.write(byteBuffers);
                if ((long)remainingLength != sentBufferSize) {
                    this.logger.warn("sent buffer {}/{}.", (Object)sentBufferSize, (Object)remainingLength);
                } else {
                    this.logger.debug("Data sent. size:{}, {}", (Object)sentBufferSize);
                }
            }
        }
        finally {
            spanStreamSendData.done();
        }
    }

    class FlushHandler
    implements StandbySpanStreamDataFlushHandler {
        FlushHandler() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void handleFlush(SpanStreamSendData spanStreamSendData) {
            if (spanStreamSendData == null) {
                return;
            }
            try {
                long sentBufferSize;
                ByteBuffer[] byteBuffers = spanStreamSendData.getSendBuffers();
                int remainingLength = ByteBufferUtils.getRemaining(byteBuffers);
                if (remainingLength != 0 && (long)remainingLength != (sentBufferSize = SpanStreamUdpSender.this.udpChannel.write(byteBuffers))) {
                    SpanStreamUdpSender.this.logger.warn("sent buffer {}/{}.", (Object)sentBufferSize, (Object)remainingLength);
                }
            }
            catch (IOException e) {
                SpanStreamUdpSender.this.logger.warn("Failed to flush span stream data.", (Throwable)e);
            }
            finally {
                spanStreamSendData.done();
            }
        }

        @Override
        public void exceptionCaught(SpanStreamSendData spanStreamSendData, Throwable e) {
            SpanStreamUdpSender.this.logger.warn("Failed to flush span stream data.", e);
        }
    }
}

