/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.contrib.streaming;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Iterator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.contrib.streaming.CollectSink;
import org.apache.flink.contrib.streaming.DataStreamIterator;
import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class DataStreamUtils {
    public static <OUT> Iterator<OUT> collect(DataStream<OUT> stream) {
        InetAddress clientAddress;
        TypeSerializer serializer = stream.getType().createSerializer(stream.getExecutionEnvironment().getConfig());
        DataStreamIterator it = new DataStreamIterator(serializer);
        StreamExecutionEnvironment env = stream.getExecutionEnvironment();
        if (env instanceof RemoteStreamEnvironment) {
            String host = ((RemoteStreamEnvironment)env).getHost();
            int port = ((RemoteStreamEnvironment)env).getPort();
            try {
                clientAddress = NetUtils.findConnectingAddress((InetSocketAddress)new InetSocketAddress(host, port), (long)2000L, (long)400L);
            }
            catch (IOException e) {
                throw new RuntimeException("IOException while trying to connect to the master", e);
            }
        }
        try {
            clientAddress = InetAddress.getLocalHost();
        }
        catch (UnknownHostException e) {
            throw new RuntimeException("getLocalHost failed", e);
        }
        DataStreamSink sink = stream.addSink(new CollectSink(clientAddress, it.getPort(), serializer));
        sink.setParallelism(1);
        new CallExecute<OUT>(stream).start();
        return it;
    }

    private static class CallExecute<OUT>
    extends Thread {
        DataStream<OUT> stream;

        public CallExecute(DataStream<OUT> stream) {
            this.stream = stream;
        }

        @Override
        public void run() {
            try {
                this.stream.getExecutionEnvironment().execute();
            }
            catch (Exception e) {
                throw new RuntimeException("Exception in execute()", e);
            }
        }
    }
}

