package com.facebook.presto.operator;

import com.facebook.airlift.concurrent.MoreFutures;
import com.facebook.airlift.concurrent.Threads;
import com.facebook.airlift.http.client.Request;
import com.facebook.airlift.http.client.Response;
import com.facebook.airlift.http.client.testing.TestingHttpClient;
import com.facebook.airlift.testing.Assertions;
import com.facebook.presto.block.BlockAssertions;
import com.facebook.presto.common.Page;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.execution.TaskId;
import com.facebook.presto.execution.TestThriftTaskStatus;
import com.facebook.presto.execution.buffer.TestingPagesSerdeFactory;
import com.facebook.presto.memory.context.AggregatedMemoryContext;
import com.facebook.presto.memory.context.SimpleLocalMemoryContext;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.page.PagesSerde;
import com.facebook.presto.spi.page.SerializedPage;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.UncheckedTimeoutException;
import com.google.common.util.concurrent.Uninterruptibles;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import java.net.URI;
import java.util.ArrayList;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(singleThreaded = true)
/* loaded from: input_file:com/facebook/presto/operator/TestExchangeClient.class */
public class TestExchangeClient {
    private ScheduledExecutorService scheduler;
    private ExecutorService pageBufferClientCallbackExecutor;
    private ExecutorService testingHttpClientExecutor;
    private static final PagesSerde PAGES_SERDE = TestingPagesSerdeFactory.testingPagesSerde();

    @BeforeClass
    public void setUp() {
        this.scheduler = Executors.newScheduledThreadPool(4, Threads.daemonThreadsNamed("test-%s"));
        this.pageBufferClientCallbackExecutor = Executors.newSingleThreadExecutor();
        this.testingHttpClientExecutor = Executors.newCachedThreadPool(Threads.daemonThreadsNamed("test-%s"));
    }

    @AfterClass(alwaysRun = true)
    public void tearDown() {
        if (this.scheduler != null) {
            this.scheduler.shutdownNow();
            this.scheduler = null;
        }
        if (this.pageBufferClientCallbackExecutor != null) {
            this.pageBufferClientCallbackExecutor.shutdownNow();
            this.pageBufferClientCallbackExecutor = null;
        }
        if (this.testingHttpClientExecutor != null) {
            this.testingHttpClientExecutor.shutdownNow();
            this.testingHttpClientExecutor = null;
        }
    }

    @Test
    public void testHappyPath() {
        testHappyPath(false, bArr -> {
            return bArr;
        });
    }

    @Test
    public void testHappyPathChecksum() {
        testHappyPath(true, bArr -> {
            return bArr;
        });
    }

    @Test(expectedExceptions = {PrestoException.class}, expectedExceptionsMessageRegExp = "Received corrupted serialized page from host.*")
    public void testHappyPathChecksumFail() {
        testHappyPath(true, bArr -> {
            bArr[bArr.length - 1] = (byte) (bArr[bArr.length - 1] ^ (-1));
            return bArr;
        });
    }

    private void testHappyPath(boolean z, Function<byte[], byte[]> function) {
        DataSize dataSize = new DataSize(32.0d, DataSize.Unit.MEGABYTE);
        DataSize dataSize2 = new DataSize(10.0d, DataSize.Unit.MEGABYTE);
        MockExchangeRequestProcessor mockExchangeRequestProcessor = new MockExchangeRequestProcessor(dataSize2, TestingPagesSerdeFactory.testingPagesSerde(z), function);
        URI create = URI.create("http://localhost:8080");
        mockExchangeRequestProcessor.addPage(create, createPage(1));
        mockExchangeRequestProcessor.addPage(create, createPage(2));
        mockExchangeRequestProcessor.addPage(create, createPage(3));
        mockExchangeRequestProcessor.setComplete(create);
        ExchangeClient createExchangeClient = createExchangeClient(mockExchangeRequestProcessor, dataSize, dataSize2);
        createExchangeClient.addLocation(create, TaskId.valueOf("queryid.0.0.0.0"));
        createExchangeClient.noMoreLocations();
        Assert.assertFalse(createExchangeClient.isClosed());
        assertPageEquals(getNextPage(createExchangeClient), createPage(1));
        Assert.assertFalse(createExchangeClient.isClosed());
        assertPageEquals(getNextPage(createExchangeClient), createPage(2));
        Assert.assertFalse(createExchangeClient.isClosed());
        assertPageEquals(getNextPage(createExchangeClient), createPage(3));
        Assert.assertNull(getNextPage(createExchangeClient));
        Assert.assertTrue(createExchangeClient.isClosed());
        ExchangeClientStatus status = createExchangeClient.getStatus();
        Assert.assertEquals(status.getBufferedPages(), 0);
        Assert.assertEquals(status.getBufferedBytes(), 0L);
        assertStatus((PageBufferClientStatus) status.getPageBufferClientStatuses().get(0), create, "closed", 3, 3, 3, "not scheduled");
    }

    @Test(timeOut = 10000)
    public void testAddLocation() throws Exception {
        DataSize dataSize = new DataSize(32.0d, DataSize.Unit.MEGABYTE);
        DataSize dataSize2 = new DataSize(10.0d, DataSize.Unit.MEGABYTE);
        MockExchangeRequestProcessor mockExchangeRequestProcessor = new MockExchangeRequestProcessor(dataSize2);
        ExchangeClient createExchangeClient = createExchangeClient(mockExchangeRequestProcessor, dataSize, dataSize2);
        URI create = URI.create("http://localhost:8081/foo");
        mockExchangeRequestProcessor.addPage(create, createPage(1));
        mockExchangeRequestProcessor.addPage(create, createPage(2));
        mockExchangeRequestProcessor.addPage(create, createPage(3));
        mockExchangeRequestProcessor.setComplete(create);
        createExchangeClient.addLocation(create, TaskId.valueOf("foo.0.0.0.0"));
        Assert.assertFalse(createExchangeClient.isClosed());
        assertPageEquals(getNextPage(createExchangeClient), createPage(1));
        Assert.assertFalse(createExchangeClient.isClosed());
        assertPageEquals(getNextPage(createExchangeClient), createPage(2));
        Assert.assertFalse(createExchangeClient.isClosed());
        assertPageEquals(getNextPage(createExchangeClient), createPage(3));
        Assert.assertFalse(MoreFutures.tryGetFutureValue(createExchangeClient.isBlocked(), 10, TimeUnit.MILLISECONDS).isPresent());
        Assert.assertFalse(createExchangeClient.isClosed());
        URI create2 = URI.create("http://localhost:8082/bar");
        mockExchangeRequestProcessor.addPage(create2, createPage(4));
        mockExchangeRequestProcessor.addPage(create2, createPage(5));
        mockExchangeRequestProcessor.addPage(create2, createPage(6));
        mockExchangeRequestProcessor.setComplete(create2);
        createExchangeClient.addLocation(create2, TaskId.valueOf("bar.0.0.0.0"));
        Assert.assertFalse(createExchangeClient.isClosed());
        assertPageEquals(getNextPage(createExchangeClient), createPage(4));
        Assert.assertFalse(createExchangeClient.isClosed());
        assertPageEquals(getNextPage(createExchangeClient), createPage(5));
        Assert.assertFalse(createExchangeClient.isClosed());
        assertPageEquals(getNextPage(createExchangeClient), createPage(6));
        Assert.assertFalse(MoreFutures.tryGetFutureValue(createExchangeClient.isBlocked(), 10, TimeUnit.MILLISECONDS).isPresent());
        Assert.assertFalse(createExchangeClient.isClosed());
        createExchangeClient.noMoreLocations();
        while (!createExchangeClient.isClosed()) {
            Thread.sleep(1L);
        }
        ImmutableMap uniqueIndex = Maps.uniqueIndex(createExchangeClient.getStatus().getPageBufferClientStatuses(), (v0) -> {
            return v0.getUri();
        });
        assertStatus((PageBufferClientStatus) uniqueIndex.get(create), create, "closed", 3, 3, 3, "not scheduled");
        assertStatus((PageBufferClientStatus) uniqueIndex.get(create2), create2, "closed", 3, 3, 3, "not scheduled");
    }

    @Test
    public void testBufferLimit() {
        DataSize dataSize = new DataSize(1.0d, DataSize.Unit.BYTE);
        DataSize dataSize2 = new DataSize(1.0d, DataSize.Unit.BYTE);
        MockExchangeRequestProcessor mockExchangeRequestProcessor = new MockExchangeRequestProcessor(dataSize2);
        URI create = URI.create("http://localhost:8080");
        mockExchangeRequestProcessor.addPage(create, createPage(1));
        mockExchangeRequestProcessor.addPage(create, createPage(2));
        mockExchangeRequestProcessor.addPage(create, createPage(3));
        mockExchangeRequestProcessor.setComplete(create);
        ExchangeClient createExchangeClient = createExchangeClient(mockExchangeRequestProcessor, dataSize, dataSize2);
        createExchangeClient.addLocation(create, TaskId.valueOf("taskid.0.0.0.0"));
        createExchangeClient.noMoreLocations();
        Assert.assertFalse(createExchangeClient.isClosed());
        long nanoTime = System.nanoTime();
        createExchangeClient.scheduleRequestIfNecessary();
        do {
            Assertions.assertLessThan(Duration.nanosSince(nanoTime), new Duration(5.0d, TimeUnit.SECONDS));
            Uninterruptibles.sleepUninterruptibly(100L, TimeUnit.MILLISECONDS);
        } while (createExchangeClient.getStatus().getBufferedPages() == 0);
        Assert.assertEquals(createExchangeClient.getStatus().getBufferedPages(), 1);
        Assert.assertTrue(createExchangeClient.getStatus().getBufferedBytes() > 0);
        assertStatus((PageBufferClientStatus) createExchangeClient.getStatus().getPageBufferClientStatuses().get(0), create, "queued", 1, 1, 1, "not scheduled");
        assertPageEquals(createExchangeClient.pollPage(), createPage(1));
        do {
            Assertions.assertLessThan(Duration.nanosSince(nanoTime), new Duration(5.0d, TimeUnit.SECONDS));
            Uninterruptibles.sleepUninterruptibly(100L, TimeUnit.MILLISECONDS);
        } while (createExchangeClient.getStatus().getBufferedPages() == 0);
        assertStatus((PageBufferClientStatus) createExchangeClient.getStatus().getPageBufferClientStatuses().get(0), create, "queued", 2, 2, 2, "not scheduled");
        Assert.assertEquals(createExchangeClient.getStatus().getBufferedPages(), 1);
        Assert.assertTrue(createExchangeClient.getStatus().getBufferedBytes() > 0);
        assertPageEquals(createExchangeClient.pollPage(), createPage(2));
        do {
            Assertions.assertLessThan(Duration.nanosSince(nanoTime), new Duration(5.0d, TimeUnit.SECONDS));
            Uninterruptibles.sleepUninterruptibly(100L, TimeUnit.MILLISECONDS);
        } while (createExchangeClient.getStatus().getBufferedPages() == 0);
        assertStatus((PageBufferClientStatus) createExchangeClient.getStatus().getPageBufferClientStatuses().get(0), create, "queued", 3, 3, 3, "not scheduled");
        Assert.assertEquals(createExchangeClient.getStatus().getBufferedPages(), 1);
        Assert.assertTrue(createExchangeClient.getStatus().getBufferedBytes() > 0);
        assertPageEquals(getNextPage(createExchangeClient), createPage(3));
        Assert.assertNull(getNextPage(createExchangeClient));
        Assert.assertEquals(createExchangeClient.getStatus().getBufferedPages(), 0);
        Assert.assertTrue(createExchangeClient.getStatus().getBufferedBytes() == 0);
        Assert.assertTrue(createExchangeClient.isClosed());
        assertStatus((PageBufferClientStatus) createExchangeClient.getStatus().getPageBufferClientStatuses().get(0), create, "closed", 3, 5, 5, "not scheduled");
    }

    @Test
    public void testClose() throws Exception {
        DataSize dataSize = new DataSize(1.0d, DataSize.Unit.BYTE);
        DataSize dataSize2 = new DataSize(1.0d, DataSize.Unit.BYTE);
        MockExchangeRequestProcessor mockExchangeRequestProcessor = new MockExchangeRequestProcessor(dataSize2);
        URI create = URI.create("http://localhost:8080");
        mockExchangeRequestProcessor.addPage(create, createPage(1));
        mockExchangeRequestProcessor.addPage(create, createPage(2));
        mockExchangeRequestProcessor.addPage(create, createPage(3));
        ExchangeClient createExchangeClient = createExchangeClient(mockExchangeRequestProcessor, dataSize, dataSize2);
        createExchangeClient.addLocation(create, TaskId.valueOf("taskid.0.0.0.0"));
        createExchangeClient.noMoreLocations();
        Assert.assertFalse(createExchangeClient.isClosed());
        assertPageEquals(getNextPage(createExchangeClient), createPage(1));
        createExchangeClient.close();
        createExchangeClient.getClass();
        waitUntilEquals(createExchangeClient::isFinished, true, new Duration(5.0d, TimeUnit.SECONDS));
        Assert.assertTrue(createExchangeClient.isClosed());
        Assert.assertNull(createExchangeClient.pollPage());
        Assert.assertEquals(createExchangeClient.getStatus().getBufferedPages(), 0);
        Assert.assertEquals(createExchangeClient.getStatus().getBufferedBytes(), 0L);
        Optional findFirst = createExchangeClient.getStatus().getPageBufferClientStatuses().stream().filter(pageBufferClientStatus -> {
            return pageBufferClientStatus.getUri().equals(create);
        }).findFirst();
        Assert.assertTrue(findFirst.isPresent());
        assertStatus((PageBufferClientStatus) findFirst.get(), "closed", "not scheduled");
    }

    @Test
    public void testInitialRequestLimit() {
        DataSize dataSize = new DataSize(16.0d, DataSize.Unit.MEGABYTE);
        DataSize dataSize2 = new DataSize(1048576.0d, DataSize.Unit.BYTE);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        MockExchangeRequestProcessor mockExchangeRequestProcessor = new MockExchangeRequestProcessor(dataSize2) { // from class: com.facebook.presto.operator.TestExchangeClient.1
            @Override // com.facebook.presto.operator.MockExchangeRequestProcessor
            public Response handle(Request request) {
                if (Uninterruptibles.awaitUninterruptibly(countDownLatch, 10L, TimeUnit.SECONDS)) {
                    return super.handle(request);
                }
                throw new UncheckedTimeoutException();
            }
        };
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (int i = 0; i < 16; i++) {
            URI create = URI.create("http://localhost:" + (8080 + i));
            arrayList.add(create);
            mockExchangeRequestProcessor.addPage(create, createPage(TestThriftTaskStatus.PHYSICAL_WRITTEN_DATA_SIZE_IN_BYTES));
            mockExchangeRequestProcessor.addPage(create, createPage(TestThriftTaskStatus.PHYSICAL_WRITTEN_DATA_SIZE_IN_BYTES));
            mockExchangeRequestProcessor.addPage(create, createPage(TestThriftTaskStatus.PHYSICAL_WRITTEN_DATA_SIZE_IN_BYTES));
            mockExchangeRequestProcessor.setComplete(create);
            arrayList2.add(dataSize2);
        }
        ExchangeClient createExchangeClient = createExchangeClient(mockExchangeRequestProcessor, dataSize, dataSize2);
        Throwable th = null;
        try {
            for (int i2 = 0; i2 < 16; i2++) {
                createExchangeClient.addLocation((URI) arrayList.get(i2), TaskId.valueOf("taskid.0.0." + i2 + ".0"));
            }
            createExchangeClient.noMoreLocations();
            Assert.assertFalse(createExchangeClient.isClosed());
            long nanoTime = System.nanoTime();
            countDownLatch.countDown();
            do {
                Assertions.assertLessThan(Duration.nanosSince(nanoTime), new Duration(5.0d, TimeUnit.SECONDS));
                Uninterruptibles.sleepUninterruptibly(100L, TimeUnit.MILLISECONDS);
            } while (createExchangeClient.getStatus().getBufferedPages() < 16);
            Assert.assertEquals(createExchangeClient.getStatus().getBufferedPages(), 16);
            Assert.assertTrue(createExchangeClient.getStatus().getBufferedBytes() > 0);
            Assert.assertEquals(16, createExchangeClient.getStatus().getPageBufferClientStatuses().stream().filter(pageBufferClientStatus -> {
                return pageBufferClientStatus.getPagesReceived() == 1;
            }).mapToInt((v0) -> {
                return v0.getPagesReceived();
            }).sum());
            Assert.assertEquals(mockExchangeRequestProcessor.getRequestMaxSizes(), arrayList2);
            for (int i3 = 0; i3 < 16 * 3; i3++) {
                Assert.assertNotNull(getNextPage(createExchangeClient));
            }
            do {
                Assertions.assertLessThan(Duration.nanosSince(nanoTime), new Duration(10.0d, TimeUnit.SECONDS));
                Uninterruptibles.sleepUninterruptibly(100L, TimeUnit.MILLISECONDS);
            } while (mockExchangeRequestProcessor.getRequestMaxSizes().size() < 64);
            for (int i4 = 0; i4 < 48; i4++) {
                arrayList2.add(dataSize2);
            }
            Assert.assertEquals(mockExchangeRequestProcessor.getRequestMaxSizes(), arrayList2);
            if (createExchangeClient != null) {
                if (0 == 0) {
                    createExchangeClient.close();
                    return;
                }
                try {
                    createExchangeClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createExchangeClient != null) {
                if (0 != 0) {
                    try {
                        createExchangeClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createExchangeClient.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testRemoveRemoteSource() throws Exception {
        DataSize dataSize = new DataSize(1.0d, DataSize.Unit.BYTE);
        DataSize dataSize2 = new DataSize(1.0d, DataSize.Unit.BYTE);
        MockExchangeRequestProcessor mockExchangeRequestProcessor = new MockExchangeRequestProcessor(dataSize2);
        URI create = URI.create("http://localhost:8081/foo.0.0.0.0");
        TaskId valueOf = TaskId.valueOf("foo.0.0.0.0");
        URI create2 = URI.create("http://localhost:8082/bar.0.0.0.0");
        TaskId valueOf2 = TaskId.valueOf("bar.0.0.0.0");
        mockExchangeRequestProcessor.addPage(create, createPage(1));
        mockExchangeRequestProcessor.addPage(create, createPage(2));
        mockExchangeRequestProcessor.addPage(create, createPage(3));
        ExchangeClient createExchangeClient = createExchangeClient(mockExchangeRequestProcessor, dataSize, dataSize2);
        createExchangeClient.addLocation(create, valueOf);
        createExchangeClient.addLocation(create2, valueOf2);
        Assert.assertFalse(createExchangeClient.isClosed());
        waitUntilEquals(() -> {
            return Integer.valueOf(createExchangeClient.getStatus().getBufferedPages());
        }, 1, new Duration(5.0d, TimeUnit.SECONDS));
        Assert.assertEquals(createExchangeClient.getStatus().getBufferedPages(), 1);
        createExchangeClient.removeRemoteSource(valueOf);
        assertPageEquals(getNextPage(createExchangeClient), createPage(1));
        Assert.assertNull(createExchangeClient.pollPage());
        Assert.assertEquals(createExchangeClient.getStatus().getBufferedPages(), 0);
        mockExchangeRequestProcessor.addPage(create2, createPage(4));
        mockExchangeRequestProcessor.addPage(create2, createPage(5));
        mockExchangeRequestProcessor.addPage(create2, createPage(6));
        mockExchangeRequestProcessor.setComplete(create2);
        Assert.assertFalse(createExchangeClient.isClosed());
        assertPageEquals(getNextPage(createExchangeClient), createPage(4));
        Assert.assertFalse(createExchangeClient.isClosed());
        assertPageEquals(getNextPage(createExchangeClient), createPage(5));
        Assert.assertFalse(createExchangeClient.isClosed());
        assertPageEquals(getNextPage(createExchangeClient), createPage(6));
        Assert.assertFalse(MoreFutures.tryGetFutureValue(createExchangeClient.isBlocked(), 10, TimeUnit.MILLISECONDS).isPresent());
        Assert.assertFalse(createExchangeClient.isClosed());
        createExchangeClient.noMoreLocations();
        while (!createExchangeClient.isClosed()) {
            Thread.sleep(1L);
        }
        ExchangeClientStatus status = createExchangeClient.getStatus();
        Optional findFirst = status.getPageBufferClientStatuses().stream().filter(pageBufferClientStatus -> {
            return pageBufferClientStatus.getUri().equals(create);
        }).findFirst();
        Assert.assertTrue(findFirst.isPresent());
        assertStatus((PageBufferClientStatus) findFirst.get(), "closed", "not scheduled");
        Optional findFirst2 = status.getPageBufferClientStatuses().stream().filter(pageBufferClientStatus2 -> {
            return pageBufferClientStatus2.getUri().equals(create2);
        }).findFirst();
        Assert.assertTrue(findFirst2.isPresent());
        assertStatus((PageBufferClientStatus) findFirst2.get(), "closed", "not scheduled");
    }

    private static Page createPage(int i) {
        return new Page(new Block[]{BlockAssertions.createLongSequenceBlock(0, i)});
    }

    private static SerializedPage getNextPage(ExchangeClient exchangeClient) {
        return (SerializedPage) MoreFutures.tryGetFutureValue(Futures.transform(exchangeClient.isBlocked(), obj -> {
            return exchangeClient.pollPage();
        }, MoreExecutors.directExecutor()), 100, TimeUnit.SECONDS).orElse(null);
    }

    private static void assertPageEquals(SerializedPage serializedPage, Page page) {
        Assert.assertNotNull(serializedPage);
        Assert.assertEquals(serializedPage.getPositionCount(), page.getPositionCount());
        Assert.assertEquals(PAGES_SERDE.deserialize(serializedPage).getChannelCount(), page.getChannelCount());
    }

    private static void assertStatus(PageBufferClientStatus pageBufferClientStatus, String str, String str2) {
        Assert.assertEquals(pageBufferClientStatus.getState(), str, "status");
        Assert.assertEquals(pageBufferClientStatus.getHttpRequestState(), str2, "httpRequestState");
    }

    private static void assertStatus(PageBufferClientStatus pageBufferClientStatus, URI uri, String str, int i, int i2, int i3, String str2) {
        Assert.assertEquals(pageBufferClientStatus.getUri(), uri);
        Assert.assertEquals(pageBufferClientStatus.getState(), str, "status");
        Assert.assertEquals(pageBufferClientStatus.getPagesReceived(), i, "pagesReceived");
        Assert.assertEquals(pageBufferClientStatus.getRequestsScheduled(), i2, "requestsScheduled");
        Assert.assertEquals(pageBufferClientStatus.getRequestsCompleted(), i3, "requestsCompleted");
        Assert.assertEquals(pageBufferClientStatus.getHttpRequestState(), str2, "httpRequestState");
    }

    private <T> void waitUntilEquals(Supplier<T> supplier, T t, Duration duration) {
        long nanoTime = System.nanoTime() + (duration.toMillis() * 1000000);
        while (System.nanoTime() - nanoTime < 0) {
            if (t.equals(supplier.get())) {
                return;
            } else {
                try {
                    Thread.sleep(10L);
                } catch (InterruptedException e) {
                }
            }
        }
        Assert.assertEquals(supplier.get(), t);
    }

    private ExchangeClient createExchangeClient(MockExchangeRequestProcessor mockExchangeRequestProcessor, DataSize dataSize, DataSize dataSize2) {
        return new ExchangeClient(dataSize, dataSize2, 1, new Duration(1.0d, TimeUnit.MINUTES), true, 0.2d, new TestingHttpClient(mockExchangeRequestProcessor, this.testingHttpClientExecutor), new TestingDriftClient(), this.scheduler, new SimpleLocalMemoryContext(AggregatedMemoryContext.newSimpleAggregatedMemoryContext(), "test"), this.pageBufferClientCallbackExecutor);
    }
}
