/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.statefun.flink.io.datastream;

import java.util.Map;
import org.apache.flink.statefun.flink.io.datastream.SinkFunctionSpec;
import org.apache.flink.statefun.flink.io.datastream.SourceFunctionSpec;
import org.apache.flink.statefun.flink.io.spi.FlinkIoModule;
import org.apache.flink.statefun.flink.io.spi.SinkProvider;
import org.apache.flink.statefun.flink.io.spi.SourceProvider;
import org.apache.flink.statefun.sdk.io.EgressSpec;
import org.apache.flink.statefun.sdk.io.IngressSpec;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class SourceSinkModule
implements FlinkIoModule {
    @Override
    public void configure(Map<String, String> globalConfiguration, FlinkIoModule.Binder binder) {
        SinkSourceProvider provider = new SinkSourceProvider();
        binder.bindSourceProvider(SourceFunctionSpec.TYPE, provider);
        binder.bindSinkProvider(SinkFunctionSpec.TYPE, provider);
    }

    private static final class SinkSourceProvider
    implements SourceProvider,
    SinkProvider {
        private SinkSourceProvider() {
        }

        @Override
        public <T> SourceFunction<T> forSpec(IngressSpec<T> spec) {
            if (!(spec instanceof SourceFunctionSpec)) {
                throw new IllegalStateException("spec " + spec + " is not of type SourceFunctionSpec");
            }
            SourceFunctionSpec casted = (SourceFunctionSpec)spec;
            return casted.delegate();
        }

        @Override
        public <T> SinkFunction<T> forSpec(EgressSpec<T> spec) {
            if (!(spec instanceof SinkFunctionSpec)) {
                throw new IllegalStateException("spec " + spec + " is not of type SourceFunctionSpec");
            }
            SinkFunctionSpec casted = (SinkFunctionSpec)spec;
            return casted.delegate();
        }
    }
}

