ClickHouse ConnectionPool 链接池的优化

一 为什么需要线程池

  • 官方解答

是维护的数据库连接的缓存,以便在将来需要对数据库发出请求时可以重用连接。 连接池用于提高在数据库上执行命令的性能。为每个用户打开和维护数据库连接,尤其是对动态数据库驱动的网站应用程序发出的请求,既昂贵又浪费资源。在连接池中,创建连接之后,将连接放在池中并再次使用,这样就不必创建新的连接。如果所有连接都正在使用,则创建一个新连接并将其添加到池中。连接池还减少了用户必须等待创建与数据库的连接的时间。

ClickHouse 原生ConnectionPool 缺点

  1. ClickHouse 官方 对于Connnection的实现过于死板,ConnectionPool 只能适用于ClickHouse TCP Connenction

  class ConnectionPool : public IConnectionPool, private PoolBase<Connection>
  {
  public:
      using Entry = IConnectionPool::Entry;
      using Base = PoolBase<Connection>;
      ConnectionPool(unsigned max_connections_,
              const String & host_,
              UInt16 port_,
              const String & default_database_,
              const String & user_,
              const String & password_,
              const String & cluster_,
              const String & cluster_secret_,
              const String & client_name_,
              Protocol::Compression compression_,
              Protocol::Secure secure_,
              Int64 priority_ = 1)
         : Base(max_connections_,
          &Poco::Logger::get("ConnectionPool (" + host_ + ":" + toString(port_) + ")")),
          host(host_),
          port(port_),
          default_database(default_database_),
          user(user_),
          password(password_),
          cluster(cluster_),
          cluster_secret(cluster_secret_),
          client_name(client_name_),
          compression(compression_),
          secure(secure_),
          priority(priority_)
      {
      }

2. PoolBase 构造函数需要继承自类ConnnectionPool 重新实现

    PoolBase(unsigned max_items_, Poco::Logger * log_)
       : max_items(max_items_), log(log_)
    {
        items.reserve(max_items);
    }
    /** Creates a new object to put into the pool. */
    virtual ObjectPtr allocObject() = 0;
  • 目的
    • 实现模版类,更加轻量化的实现

二 准备工作

基本C++概念
std::mutex  锁
std::unique_lock 唯一锁
std::lock_guard
std::shared_ptr 指针,带引用计数器 use_count
std::vector 数据
class 类
template class 模版类
网线限制参数
connection_timeout
send_timeout
receive_timeout
tcp_keep_alive_timeout
http_keep_alive_timeout 
secure_connection_timeout
handshake_timeout 
包参数
secure 安全模式 http?https
compression 数据传输是否压缩
INodeInfo 节点信息
ip   节点ip
role 节点的角色
IClusterInfo 集群信息
http/tcp port 集群访问的端口
user 用户名
password 密码
std::vector<NodeInfo> 集群节点
xxx 其他 
网络限制参数
网络传输参数

Connection

IClusterInfo
socket/client  Server 放提供的链接方式的client 的封装
​

ConnectionPool

三 类

3.1 集群信息
  • 1 NodeInfo
struct NodeInfo
{
    explicit NodeInfo(std::string host_, std::string role_ = "follower") : host(host_), role(role_) { }
    std::string host;
    std::string role;
};
using NodeInfoPtr = std::shared_ptr<NodeInfo>;
using NodeInfoPtrs = std::vector<NodeInfoPtr>;
  • 2 ClusterInfo
struct ClusterInfo
{
    enum class Compression
    {
        Disable = 0,
        Enable = 1,
    };
​
    enum class Secure
    {
        Disable = 0,
        Enable = 1,
    };
​
​
    explicit ClusterInfo(const Poco::Util::AbstractConfiguration & config, std::string config_name);
    //to do ,memory ClusterInfo(xxx);
    NodeInfoPtrs node_info_ptrs;
    std::int32_t port;
    std::string user = "root";
    std::string password;
    Secure security = Secure::Disable;
    Compression compression = Compression::Enable;
    ConnectionTimeouts timeouts;
};
using ClusterInfoPtr = std::shared_ptr<ClusterInfo>;
​
​
​
ConnectionParameters::ConnectionParameters(const Poco::Util::AbstractConfiguration & config)
{
    bool is_secure = config.getBool("secure", false);
    security = is_secure ? Protocol::Secure::Enable : Protocol::Secure::Disable;
​
    host = config.getString("host", "localhost");
    port = config.getInt(
        "port", config.getInt(is_secure ? "tcp_port_secure" : "tcp_port", is_secure ? DBMS_DEFAULT_SECURE_PORT : DBMS_DEFAULT_PORT));
​
    default_database = config.getString("database", "");
​
    /// changed the default value to "default" to fix the issue when the user in the prompt is blank
    user = config.getString("user", "default");
​
    bool password_prompt = false;
    if (config.getBool("ask-password", false))
    {
        if (config.has("password"))
            throw Exception("Specified both --password and --ask-password. Remove one of them", ErrorCodes::BAD_ARGUMENTS);
        password_prompt = true;
    }
    else
    {
        password = config.getString("password", "");
        /// if the value of --password is omitted, the password will be set implicitly to "\\n"
        if (password == "\\n")
            password_prompt = true;
    }
    if (password_prompt)
    {
#if !defined(ARCADIA_BUILD)
        std::string prompt{"Password for user (" + user + "): "};
        char buf[1000] = {};
        if (auto * result = readpassphrase(prompt.c_str(), buf, sizeof(buf), 0))
            password = result;
#endif
    }
​
    /// By default compression is disabled if address looks like localhost.
    compression = config.getBool("compression", !isLocalAddress(DNSResolver::instance().resolveHost(host)))
        ? Protocol::Compression::Enable : Protocol::Compression::Disable;
​
    timeouts = ConnectionTimeouts(
        Poco::Timespan(config.getInt("connect_timeout", DBMS_DEFAULT_CONNECT_TIMEOUT_SEC), 0),
        Poco::Timespan(config.getInt("send_timeout", DBMS_DEFAULT_SEND_TIMEOUT_SEC), 0),
        Poco::Timespan(config.getInt("receive_timeout", DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC), 0),
        Poco::Timespan(config.getInt("tcp_keep_alive_timeout", 0), 0));
}
  • 3 ConnectionCache to do 数据缓存
class ConnectionCache
{
  
}
3.2 Connection
class IConnection
{
public:
    IConnection() = default;
​
    virtual ~IConnection() = default;
​
    virtual void connect() = 0;
    virtual void close() = 0;
​
    virtual void heartbeat() = 0;
​
    virtual void isLeader() = 0;
    virtual void getleader() = 0;
    virtual void getCluster() = 0;
​
    virtual void send() = 0;
    virtual void receive() = 0;
​
​
    virtual bool inUse() = 0;
};
xxxx
3.2 ClickHouse ConnectionPool
  • 优化
template<typename TObject>
class ConnectionPool {
public:
    using ObjectPtr = std::shared_ptr<TObject>;
    using ObjectPtrs = std::vector<ObjectPtr>;
​
    ObjectPtr get() {
        std::unique_lock<std::mutex> lock(object_mutex);
​
// 通过shared_ptr 直接简化封装,和use_count 判断object 是否在使用
        while (true) {
            for (auto object_ptr : object_ptrs)
                if (object_ptr.use_count() == 2)
                    return object_ptr;
​
            auto object_index = object_ptrs.size();
            if (object_index < max_pool_size) {
                alloObject(object_index);
                continue;
            }
​
            condition_variable.wait_for(lock, std::chrono::milliseconds(10));
        }
    }
​
    ConnectionPool(ClusterInfoPtr cluster_info_ptr_, size_t min_pool_size_, size_t max_pool_size_)
            : cluster_info_ptr(std::move(cluster_info_ptr_)), min_pool_size(min_pool_size_),
              max_pool_size(max_pool_size_) {
        reserve();
    }
​
​
    void reserve() {
        std::lock_guard<std::mutex> lock(object_mutex);
        for (size_t object_index = object_ptrs.size(); object_index < min_pool_size; object_index++)
            alloObject(object_index);
    }
​
​
    void alloObject(size_t object_index) {
        ObjectPtr object_ptr
                = std::make_shared<TObject>(
                        cluster_info_ptr->node_info_ptrs[object_index % cluster_info_ptr->node_info_ptrs.size()]);
        object_ptrs.emplace_back(object_ptr);
    }
    //to do 集群信息更新,更新ClusterInfo
​
    ~ConnectionPool() = default;
​
private:
    ClusterInfoPtr cluster_info_ptr;
    ObjectPtrs object_ptrs;
    size_t min_pool_size;
    size_t max_pool_size;
    std::mutex object_mutex;
    std::condition_variable condition_variable;
};
​

3.2 ClickHouse ConnectionPool

ClickHouse Object 管理

  1. 使用Entry 绑定Object 作代码调用的返回值
  2. Entry 来实际操作 Object
  3. 用户在使用的时候 必须接受Entry 的返回值
  4. 使用shared_ptr 来完成最终object 自我销毁
    /** What is given to the user. */
    class Entry
    {
    public:
        friend class PoolBase<Object>;

        Entry() = default; /// For deferred initialization.

        /** The `Entry` object protects the resource from being used by another thread.
          * The following methods are forbidden for `rvalue`, so you can not write a similar to
          *
          * auto q = pool.get()->query("SELECT .."); // Oops, after this line Entry was destroyed
          * q.execute (); // Someone else can use this Connection
          */
        Object * operator->() && = delete;
        const Object * operator->() const && = delete;
        Object & operator*() && = delete;
        const Object & operator*() const && = delete;

        Object * operator->() & { return &*data->data.object; }
        const Object * operator->() const & { return &*data->data.object; }
        Object & operator*() & { return *data->data.object; }
        const Object & operator*() const & { return *data->data.object; }

        bool isNull() const { return data == nullptr; }

        PoolBase * getPool() const
        {
            if (!data)
                throw DB::Exception("Attempt to get pool from uninitialized entry", DB::ErrorCodes::LOGICAL_ERROR);
            return &data->data.pool;
        }

    private:
        std::shared_ptr<PoolEntryHelper> data;

        explicit Entry(PooledObject & object) : data(std::make_shared<PoolEntryHelper>(object)) { }
    };

    virtual ~PoolBase() = default;

    /** Allocates the object. Wait for free object in pool for 'timeout'. With 'timeout' < 0, the timeout is infinite. */
    Entry get(Poco::Timespan::TimeDiff timeout)
    {
        std::unique_lock lock(mutex);

        while (true)
        {
            for (auto & item : items)
                if (!item->in_use)
                    return Entry(*item);

            if (items.size() < max_items)
            {
                ObjectPtr object = allocObject();
                items.emplace_back(std::make_shared<PooledObject>(object, *this));
                return Entry(*items.back());
            }

            LOG_INFO(log, "No free connections in pool. Waiting.");

            if (timeout < 0)
                available.wait(lock);
            else
                available.wait_for(lock, std::chrono::microseconds(timeout));
        }
    }

四 如何使用

eg: Redis

1 创建 RedisConnection

​
class RedisConnection : public IConnection
{
public:
    RedisConnection(NodeInfoPtr nodeInfoPtr);
    RedisConnection(ClusterInfoPtr clusterInfoPtr);
    ~RedisConnection();
​
//redis client object 
    xxxxx
};
2 ConnectionPool 模版 Client 实现
class RedisClient : public SConnectionPool<RedisConnection>
{
public:
    RedisClient(ClusterInfoPtr clusterInfoPtr_, size_t min_pool_size_, size_t max_pool_size_)
        : ConnectionPool(clusterInfoPtr_, min_pool_size_, max_pool_size_), cluster_info_ptr(clusterInfoPtr_)
    {
    }
​
    ~RedisClient() = default;
​
};

想较 ClickHouse Connection Pool 更加轻量化

demo 后面放到个人github

people1 method5
xxxxxxx  1
people1 method1
xxxxxxx  2
people1 method2
xxxxxxx  4
people1 method4
xxxxxxx  3
people1 method3
xxxxxxx  1
people1 method1
xxxxxxx  5
people1 method5
xxxxxxx  4
people1 method4
xxxxxxx  5
people1 method5
xxxxxxx  1
people1 method1
xxxxxxx  2
people1 method2
xxxxxxx  3
people1 method3
xxxxxxx  1
people1 method1
xxxxxxx  5
people1 method5
xxxxxxx  3
people1 method3
xxxxxxx  4
people1 method4
xxxxxxx  4
people1 method4
xxxxxxx  3
people1 method3
xxxxxxx  1
people1 method1
xxxxxxx  2
people1 method2
xxxxxxx  5
people1 method5
xxxxxxx  4
people1 method4

感谢阅读!

本站文章资源均来源自网络,除非特别声明,否则均不代表站方观点,并仅供查阅,不作为任何参考依据!
如有侵权请及时跟我们联系,本站将及时删除!
如遇版权问题,请查看 本站版权声明
THE END
分享
二维码
海报
ClickHouse ConnectionPool 链接池的优化
是维护的数据库连接的缓存,以便在将来需要对数据库发出请求时可以重用连接。 连接池用于提高在数据库上执行命令的性能。为每个用户打开和维护数据库连接,尤其是对动态数...
<<上一篇
下一篇>>