/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs;

import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.AppendTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.Test;

public class TestMultiThreadedSync {
    static final int blockSize = 0x100000;
    static final int numBlocks = 10;
    static final int fileSize = 0xA00001;
    private static final int NUM_THREADS = 10;
    private static final int WRITE_SIZE = 517;
    private static final int NUM_WRITES_PER_THREAD = 1000;
    private byte[] toWrite = null;

    private FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl) throws IOException {
        FSDataOutputStream stm = fileSys.create(name, true, fileSys.getConf().getInt("io.file.buffer.size", 4096), (short)repl, 0x100000L);
        return stm;
    }

    private void initBuffer(int size) {
        long seed = AppendTestUtil.nextLong();
        this.toWrite = AppendTestUtil.randomBytes(seed, size);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMultipleSyncers() throws Exception {
        Configuration conf = new Configuration();
        MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
        FileSystem fs = cluster.getFileSystem();
        Path p = new Path("/multiple-syncers.dat");
        try {
            this.doMultithreadedWrites(conf, p, 10, 517, 1000);
        }
        finally {
            fs.close();
            cluster.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSyncWhileClosing() throws Throwable {
        Configuration conf = new Configuration();
        MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
        FileSystem fs = cluster.getFileSystem();
        Path p = new Path("/sync-and-close.dat");
        final FSDataOutputStream stm = this.createFile(fs, p, 1);
        ArrayList<1> flushers = new ArrayList<1>();
        final AtomicReference thrown = new AtomicReference();
        try {
            int i;
            for (i = 0; i < 10; ++i) {
                Thread thread = new Thread(){

                    @Override
                    public void run() {
                        try {
                            try {
                                while (true) {
                                    stm.sync();
                                }
                            }
                            catch (IOException ioe) {
                                if (!ioe.toString().contains("DFSOutputStream is closed")) {
                                    throw ioe;
                                }
                                return;
                            }
                        }
                        catch (Throwable t) {
                            thrown.set(t);
                            return;
                        }
                    }
                };
                thread.start();
                flushers.add(thread);
            }
            for (i = 0; i < 10000; ++i) {
                stm.write(1);
            }
            stm.close();
            for (Thread thread : flushers) {
                thread.join();
            }
            if (thrown.get() != null) {
                throw (Throwable)thrown.get();
            }
        }
        finally {
            fs.close();
            cluster.shutdown();
        }
    }

    public void doMultithreadedWrites(Configuration conf, Path p, int numThreads, int bufferSize, int numWrites) throws Exception {
        this.initBuffer(bufferSize);
        FileSystem fs = p.getFileSystem(conf);
        FSDataOutputStream stm = this.createFile(fs, p, 1);
        System.out.println("Created file simpleFlush.dat");
        stm.sync();
        stm.sync();
        stm.write(1);
        stm.sync();
        stm.sync();
        CountDownLatch countdown = new CountDownLatch(1);
        ArrayList<AppendTestUtil.WriterThread> threads = new ArrayList<AppendTestUtil.WriterThread>();
        AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
        for (int i = 0; i < numThreads; ++i) {
            AppendTestUtil.WriterThread writerThread = new AppendTestUtil.WriterThread(stm, this.toWrite, thrown, countdown, numWrites);
            threads.add(writerThread);
            writerThread.start();
        }
        countdown.countDown();
        for (Thread thread : threads) {
            thread.join();
        }
        if (thrown.get() != null) {
            throw new RuntimeException("Deferred", thrown.get());
        }
        stm.close();
        System.out.println("Closed file.");
    }

    public static void main(String[] args) throws Exception {
        TestMultiThreadedSync test = new TestMultiThreadedSync();
        Configuration conf = new Configuration();
        Path p = new Path("/user/todd/test.dat");
        long st = System.nanoTime();
        test.doMultithreadedWrites(conf, p, 10, 511, 50000);
        long et = System.nanoTime();
        System.out.println("Finished in " + (et - st) / 1000000L + "ms");
    }
}

