聊聊flink的ConnectionManager

news/2024/7/15 16:31:34 标签: 大数据, java, netty

本文主要研究一下flink的ConnectionManager

ConnectionManager

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java

public interface ConnectionManager {

    void start(ResultPartitionProvider partitionProvider,
                TaskEventDispatcher taskEventDispatcher) throws IOException;

    /**
     * Creates a {@link PartitionRequestClient} instance for the given {@link ConnectionID}.
     */
    PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) throws IOException, InterruptedException;

    /**
     * Closes opened ChannelConnections in case of a resource release.
     */
    void closeOpenChannelConnections(ConnectionID connectionId);

    int getNumberOfActiveConnections();

    int getDataPort();

    void shutdown() throws IOException;

}
  • ConnectionManager定义了start、shutdown、closeOpenChannelConnections等方法用于管理physical connections;它有两个子类,一个是LocalConnectionManager,一个是NettyConnectionManager

LocalConnectionManager

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java

public class LocalConnectionManager implements ConnectionManager {

    @Override
    public void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) {
    }

    @Override
    public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) {
        return null;
    }

    @Override
    public void closeOpenChannelConnections(ConnectionID connectionId) {}

    @Override
    public int getNumberOfActiveConnections() {
        return 0;
    }

    @Override
    public int getDataPort() {
        return -1;
    }

    @Override
    public void shutdown() {}
}
  • LocalConnectionManager实现了ConnectionManager接口,不过它的实现基本是空操作

NettyConnectionManager

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java

public class NettyConnectionManager implements ConnectionManager {

    private final NettyServer server;

    private final NettyClient client;

    private final NettyBufferPool bufferPool;

    private final PartitionRequestClientFactory partitionRequestClientFactory;

    public NettyConnectionManager(NettyConfig nettyConfig) {
        this.server = new NettyServer(nettyConfig);
        this.client = new NettyClient(nettyConfig);
        this.bufferPool = new NettyBufferPool(nettyConfig.getNumberOfArenas());

        this.partitionRequestClientFactory = new PartitionRequestClientFactory(client);
    }

    @Override
    public void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) throws IOException {
        NettyProtocol partitionRequestProtocol = new NettyProtocol(
            partitionProvider,
            taskEventDispatcher,
            client.getConfig().isCreditBasedEnabled());

        client.init(partitionRequestProtocol, bufferPool);
        server.init(partitionRequestProtocol, bufferPool);
    }

    @Override
    public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId)
            throws IOException, InterruptedException {
        return partitionRequestClientFactory.createPartitionRequestClient(connectionId);
    }

    @Override
    public void closeOpenChannelConnections(ConnectionID connectionId) {
        partitionRequestClientFactory.closeOpenChannelConnections(connectionId);
    }

    @Override
    public int getNumberOfActiveConnections() {
        return partitionRequestClientFactory.getNumberOfActiveClients();
    }

    @Override
    public int getDataPort() {
        if (server != null && server.getLocalAddress() != null) {
            return server.getLocalAddress().getPort();
        } else {
            return -1;
        }
    }

    @Override
    public void shutdown() {
        client.shutdown();
        server.shutdown();
    }

    NettyClient getClient() {
        return client;
    }

    NettyServer getServer() {
        return server;
    }

    NettyBufferPool getBufferPool() {
        return bufferPool;
    }
}
  • NettyConnectionManager实现了ConnectionManager接口;它的构造器使用NettyConfig创建了NettyServer、NettyClient、NettyBufferPool,同时使用NettyClient创建了PartitionRequestClientFactory
  • start方法创建了NettyProtocol,同时初始化NettyClient、NettyServer;shutdown方法则关闭NettyClient、NettyServer;closeOpenChannelConnections则是使用partitionRequestClientFactory.closeOpenChannelConnections来关闭指定的connectionId
  • createPartitionRequestClient方法通过partitionRequestClientFactory.createPartitionRequestClient来创建PartitionRequestClient

PartitionRequestClientFactory

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java

class PartitionRequestClientFactory {

    private final NettyClient nettyClient;

    private final ConcurrentMap<ConnectionID, Object> clients = new ConcurrentHashMap<ConnectionID, Object>();

    PartitionRequestClientFactory(NettyClient nettyClient) {
        this.nettyClient = nettyClient;
    }

    /**
     * Atomically establishes a TCP connection to the given remote address and
     * creates a {@link PartitionRequestClient} instance for this connection.
     */
    PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) throws IOException, InterruptedException {
        Object entry;
        PartitionRequestClient client = null;

        while (client == null) {
            entry = clients.get(connectionId);

            if (entry != null) {
                // Existing channel or connecting channel
                if (entry instanceof PartitionRequestClient) {
                    client = (PartitionRequestClient) entry;
                }
                else {
                    ConnectingChannel future = (ConnectingChannel) entry;
                    client = future.waitForChannel();

                    clients.replace(connectionId, future, client);
                }
            }
            else {
                // No channel yet. Create one, but watch out for a race.
                // We create a "connecting future" and atomically add it to the map.
                // Only the thread that really added it establishes the channel.
                // The others need to wait on that original establisher's future.
                ConnectingChannel connectingChannel = new ConnectingChannel(connectionId, this);
                Object old = clients.putIfAbsent(connectionId, connectingChannel);

                if (old == null) {
                    nettyClient.connect(connectionId.getAddress()).addListener(connectingChannel);

                    client = connectingChannel.waitForChannel();

                    clients.replace(connectionId, connectingChannel, client);
                }
                else if (old instanceof ConnectingChannel) {
                    client = ((ConnectingChannel) old).waitForChannel();

                    clients.replace(connectionId, old, client);
                }
                else {
                    client = (PartitionRequestClient) old;
                }
            }

            // Make sure to increment the reference count before handing a client
            // out to ensure correct bookkeeping for channel closing.
            if (!client.incrementReferenceCounter()) {
                destroyPartitionRequestClient(connectionId, client);
                client = null;
            }
        }

        return client;
    }

    public void closeOpenChannelConnections(ConnectionID connectionId) {
        Object entry = clients.get(connectionId);

        if (entry instanceof ConnectingChannel) {
            ConnectingChannel channel = (ConnectingChannel) entry;

            if (channel.dispose()) {
                clients.remove(connectionId, channel);
            }
        }
    }

    int getNumberOfActiveClients() {
        return clients.size();
    }

    /**
     * Removes the client for the given {@link ConnectionID}.
     */
    void destroyPartitionRequestClient(ConnectionID connectionId, PartitionRequestClient client) {
        clients.remove(connectionId, client);
    }

    //......
}
  • PartitionRequestClientFactory的构造器需要一个NettyClient;它使用ConcurrentHashMap在内存维护了一个ConnectionID与PartitionRequestClient或ConnectingChannel的映射关系
  • createPartitionRequestClient方法会先从ConcurrentHashMap查找是否有对应ConnectionID的PartitionRequestClient或ConnectingChannel,如果存在且是PartitionRequestClient实例则返回,如果存在且是ConnectingChannel实例则调用ConnectingChannel.waitForChannel等待PartitionRequestClient,然后替换对应ConnectionID在ConcurrentHashMap的值为PartitionRequestClient;如果ConcurrentHashMap没有对应ConnectionID的值,则会创建一个ConnectingChannel,然后放入到ConcurrentHashMap中,同时获取old object,如果old为null,则使用nettyClient.connect进行连接,然后获取PartitionRequestClient,之后替换ConcurrentHashMap中的值;如果old是ConnectingChannel则调用ConnectingChannel.waitForChannel等待PartitionRequestClient,然后替换ConcurrentHashMap中的值;在返回PartitionRequestClient之前会通过client.incrementReferenceCounter()来递增引用,如果递增不成功则调用destroyPartitionRequestClient,返回null,递增成功则返回PartitionRequestClient
  • closeOpenChannelConnections方法则判断,如果是ConnectingChannel,则调用ConnectingChannel.dispose,成功之后从ConcurrentHashMap中移除

ConnectingChannel

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java

    private static final class ConnectingChannel implements ChannelFutureListener {

        private final Object connectLock = new Object();

        private final ConnectionID connectionId;

        private final PartitionRequestClientFactory clientFactory;

        private boolean disposeRequestClient = false;

        public ConnectingChannel(ConnectionID connectionId, PartitionRequestClientFactory clientFactory) {
            this.connectionId = connectionId;
            this.clientFactory = clientFactory;
        }

        private boolean dispose() {
            boolean result;
            synchronized (connectLock) {
                if (partitionRequestClient != null) {
                    result = partitionRequestClient.disposeIfNotUsed();
                }
                else {
                    disposeRequestClient = true;
                    result = true;
                }

                connectLock.notifyAll();
            }

            return result;
        }

        private void handInChannel(Channel channel) {
            synchronized (connectLock) {
                try {
                    NetworkClientHandler clientHandler = channel.pipeline().get(NetworkClientHandler.class);
                    partitionRequestClient = new PartitionRequestClient(
                        channel, clientHandler, connectionId, clientFactory);

                    if (disposeRequestClient) {
                        partitionRequestClient.disposeIfNotUsed();
                    }

                    connectLock.notifyAll();
                }
                catch (Throwable t) {
                    notifyOfError(t);
                }
            }
        }

        private volatile PartitionRequestClient partitionRequestClient;

        private volatile Throwable error;

        private PartitionRequestClient waitForChannel() throws IOException, InterruptedException {
            synchronized (connectLock) {
                while (error == null && partitionRequestClient == null) {
                    connectLock.wait(2000);
                }
            }

            if (error != null) {
                throw new IOException("Connecting the channel failed: " + error.getMessage(), error);
            }

            return partitionRequestClient;
        }

        private void notifyOfError(Throwable error) {
            synchronized (connectLock) {
                this.error = error;
                connectLock.notifyAll();
            }
        }

        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            if (future.isSuccess()) {
                handInChannel(future.channel());
            }
            else if (future.cause() != null) {
                notifyOfError(new RemoteTransportException(
                        "Connecting to remote task manager + '" + connectionId.getAddress() +
                                "' has failed. This might indicate that the remote task " +
                                "manager has been lost.",
                        connectionId.getAddress(), future.cause()));
            }
            else {
                notifyOfError(new LocalTransportException(
                    String.format(
                        "Connecting to remote task manager '%s' has been cancelled.",
                        connectionId.getAddress()),
                    null));
            }
        }
    }
  • ConnectingChannel实现了netty的ChannelFutureListener接口,它的operationComplete方法在ChannelFuture是success的时候会调用handInChannel方法,该方法会创建PartitionRequestClient;waitForChannel方法则会等待partitionRequestClient创建成功然后返回

小结

  • ConnectionManager定义了start、shutdown、closeOpenChannelConnections等方法用于管理physical connections;它有两个子类,一个是LocalConnectionManager,一个是NettyConnectionManager
  • LocalConnectionManager实现了ConnectionManager接口,不过它的实现基本是空操作;NettyConnectionManager实现了ConnectionManager接口,它的构造器使用NettyConfig创建了NettyServer、NettyClient、NettyBufferPool,同时使用NettyClient创建了PartitionRequestClientFactory,start方法创建了NettyProtocol,同时初始化NettyClient、NettyServer,shutdown方法则关闭NettyClient、NettyServer,closeOpenChannelConnections则是使用partitionRequestClientFactory.closeOpenChannelConnections来关闭指定的connectionId,createPartitionRequestClient方法通过partitionRequestClientFactory.createPartitionRequestClient来创建PartitionRequestClient
  • PartitionRequestClientFactory的构造器需要一个NettyClient;它使用ConcurrentHashMap在内存维护了一个ConnectionID与PartitionRequestClient或ConnectingChannel的映射关系;ConnectingChannel实现了netty的ChannelFutureListener接口,它的operationComplete方法在ChannelFuture是success的时候会调用handInChannel方法,该方法会创建PartitionRequestClient;waitForChannel方法则会等待partitionRequestClient创建成功然后返回

doc

  • ConnectionManager

http://www.niftyadmin.cn/n/835512.html

相关文章

JavaScript作用域及作用域链

只有函数可以制造作用域结构&#xff0c;那么只要是代码&#xff0c;就至少有一个作用域&#xff0c;即全局作用域&#xff0c;凡是代码中有函数的&#xff0c;那么这个函数就构成另一个作用域&#xff0c;如果函数中还有函数&#xff0c;那么在这个函数中就又可以诞生一个作用…

winfrom图片放大器

废话不多说&#xff0c;直接上图看效果&#xff0c;左上角是原图片大小&#xff0c;右边是局部放大的效果 主要代码贴在下面&#xff0c;picBox是原图控件名&#xff0c;picBox_Show是放大控件名 private void picBox_Paint(object sender, PaintEventArgs e) { …

2019全球传统文化春节晚会录制完成 非遗与雅文化令人期待

图为晚会录制现场。 钟欣 摄 图为晚会录制现场。 钟欣 摄 2019全球传统文化春节晚会录制完成 “非遗”与“雅文化”双主打令人期待 中新网北京1月30日电 (记者 刘旭)2019全球传统文化春节晚会日前在京录制完成并顺利公演。春节期间&#xff0c;晚会将在国内外四十余个国家/地区…

微信h5页面video标签用法总结

写在前面 最近需要在公司公众号的微信网页里添加视频&#xff0c;这就用到了HTML5的新标签video。微信浏览器对不同手机的video标签兼容性不一致&#xff0c;导致我在实际使用中遇到了各种问题&#xff0c;因此在这里简要总结一下。 内联播放 video标签默认全屏播放&#xff0c…

JavaScript对象的概述、声明及使用

1.什么是对象&#xff1f; 生活中&#xff1a;万物皆对象&#xff0c;对象是一个具体的事物&#xff0c;一个具体的事物就会有行为和特征 举例&#xff1a;一辆汽车&#xff0c;一部手机 车是一类事物&#xff0c;门口停的那辆车才是对象特征&#xff1a;红色&#xff0c;四个…

程序员从零到月薪15K的转变,python200G资料分享

python具有强大和丰富的库&#xff0c;它常常被叫为胶水语言&#xff0c;能够把用其他语言制作的各种模块&#xff08;尤其是C/C&#xff09;很轻松地联结在一起。 Python语法简洁清晰&#xff0c;特色之一是强制用空白符(white space)作为语句缩进。 简单来说&#xff0c;pyth…

Grafana 6.0正式发布!新增查询工作流,全新独立Gauge面板

2月25日&#xff0c;Grafana 6.0稳定版正式发布。Grafana 6.0带来了很多令人兴奋的新特性和增强&#xff0c;同时提供了一个新的基于React的面板和插件架构&#xff0c;它们将从根本上改变Grafana平台的未来。 Grafana 6.0新特性 Explore——一个全新的专注于查询的工作流&…

Blog-08-《一周快速上手Kotlin For Android》-之ArrayList

在 Kotlin 中没有实现和 Java 一样的 List 集合&#xff0c;而是使用了和 Java 一样的 ArrayList 集合。Kotlin 中提供了以下四种函数方法来使用 ArrayList&#xff0c;分别是 1、listOf()2、listOfNotNull()3、mutableListOf()4、arraylistOf() 其中最主要的区别为可变的集合与…