简介
安装部署
以 RPM 包为例,可以从 Package ClickHouse 下载关键的包,通常是 clickhouse-server
clickhouse-common-static
clickhouse-client
就可以了。
安装后,会包含一个 /usr/lib/systemd/system/clickhouse-server.service
服务管理文件,默认与 systemd
相关的配置文件保存在 /etc/clickhouse-server
目录下,可以使用 goreman 进行部署。
配置文件
其中系统的配置文件在 /etc/clickhouse-server
目录下,包括了默认 config.xml
系统配置,以及 users.d/default-password.xml
用户配置。用户有如下几种配置方式:
----- 明文密码
<password>HelloCK</password>
----- 通过SHA256加密
echo -n HelloCK | openssl dgst -sha256
<password_sha256_hex>8757d5ea...</password_sha256_hex>
----- 通过double_sha1加密
echo -n HelloCK | openssl dgst -shal -binary | openssl dgst -shal
<password_double_sha1_hex>60a5a877...</password_double_shal_hex>
在线修改后无需重启,还可以修改监听地址 <listen_host>
配置,如果是 ::
则支持 IPv4 和 IPv6,而 0.0.0.0
则只支持 IPv4。另外,异常可以查看 /var/log/clickhouse-server/clickhouse-server.log
日志。
启动服务
后续可以直接通过 systemd
进行管理。
----- 启动服务,会通过watchdog进程进行保活
# systemctl start clickhouse-server
----- 建立连接,其中port对应服务配置中的tcp_port配置项
$ clickhouse-client --host=127.0.0.1 --port=9002 --user=default --password='YourPassword'
在官方 Example Datasets 包含了很多测试集,初始建议使用 UK Property Prices 这个数据集,暂时没有找到 CSV 格式的列名映射,离线直接通过 python3 -m http.server -d . 8080
启动一个简单的静态服务器。
目录结构
数据默认保存在 /var/lib/clickhouse/data
目录下,各个目录保存内容如下:
data 真正数据保存目录,符号连接指向 store 目录
metadata 保存相关的建表语句,符号连接指向 store 目录
如果自定义了 storage_configuration
配置参数,那么会保存在 disks
目录下,某些版本里的符号连接有问题。
端口
会启动如下几个端口:
8123
HTTP 协议,包括/dashboard
图形界面等。
Keeper
这里的配置文件很简单,可以在 CK 的进程内嵌入式部署,而且分成了 Shared 是否共享,也可以独立部署。其配置项比较简单,可以参考 Keeper 中的介绍。
CK 依赖 ZK 主要是为了解决两个场景:A) 分布式 DDL 执行;B) 复制表的节点间同步,包括了写入、Merge、Mutation 等。
另外,关于 ZK 的部署可以参考 ZooKeeper 基本介绍 内容。
使用技巧
变量
在内部表 system.settings
保存了相关的配置参数,可以通过 SET send_logs_level='trace'
类似方式设置,并通过 SHOW SETTINGS LIKE 'send_logs%'
查看。
MergeTree
这应该是最复杂也最常用的引擎,包括了基于此的扩展引擎。
Replacing
唯一键特性,根据建表语句ReplacingMergeTree(xxx)
中的xxx
版本信息删除重复数据,不指定则保存最后一行。Summing
在合并时会指定的字段进行累加,如果不指定则聚合所有的非排序键。Aggregating
通过预聚合提升读取性能,建表时针对列指定聚合函数。Collapsing
支持行级别的数据修改和删除,会先以新增方式保存数据,在合并过程中实际修改数据。VersionedCollapsing
同上,区别是通过版本修改。Graphite
时序数据的兼容实现。
建表时,可以通过 ORDER BY
和 PRIMARY KEY
指定排序键和主键,大部分场景只需要 ORDER BY
即可,而在 Summing
和 Aggregating
聚合场景中可以指定不同的值,不过要求 PRIMARY KEY
必须是 ORDER BY
的前缀列。
将颗粒 (granule) 作为并行的最小单位,在 DDL 中通过 index_granularity
参数指定,默认是 8192=8x1024
。
primary.idx 主键索引,一个未压缩的扁平数据结构
col.mrk 每个颗粒在数据文件中的偏移,包括了压缩和非压缩数据
索引
与 TP 类似 AP 也会存在主键索引,当查询命中主键时最快,但有些场景可能无法命中,对于 TP 来说会引入二级索引,但 AP 通常是列存,无法直接映射到具体的行,所以,在 ClickHouse 中引入了跳数索引。
分布式
原本的 CK 是单机引擎,因为高可用就有了 Replicate
复制到多个机器上,而为了便于查询则有了分布式引擎。支持多主写入,其处理逻辑相同,通过基于 ZK 实现日志复制,包括了主副本选举、副本状态感知、操作日志分发、任务队列、Block 去重等。
目录结构
这里主要是 ZK 的元数据组织结构,通常以 /clickhouse/tables/{shard}/table_name
作为表级别的路径。
metadata 元数据,包括主键、分区键等
columns 列信息,含名称、数据类型
replicas/ 副本信息,在建表时指定,各副本执行本地任务会使用
replicas/{replica}/log_pointer 日志复制的偏移
replicas/{replica}/mutation_pointer Mutation 指针
replicas/{replica}/queue 执行过程以队列方式实现,用于顺序执行
leader_election/ 用于副本的选举,会主导 Merge Mutation 操作,主完成后复制到其它副本
blocks/ 块Hash信息,与log对应,用于block去重
block_numbers/
quorum/ 超过 quorum 指定数量之后整个写入才算成功
log/ 核心操作日志 (Insert Merge Drop),日志顺序递增
log/log-0000000001 保存了创建时间、源、BlockID 等信息
mutations/ 与 log 类似,对应 Alter Delete/Update 操作
DDL
分布式 DDL 实现依赖 ZooKeeper 组件,在 /clickhouse/task_queue/ddl/{task}
中同时包含了 active
和 finished
两个节点,用来保存未完成和已完成的任务,相关的配置如下,任务执行可以查看 system.distributed_ddl_queue
系统表。
<distributed_ddl>
<path>/clickhouse/task_queue/ddl</path> // ZK中的保存路径
<max_tasks_in_queue>3</max_tasks_in_queue> // 队列中的最大任务数
<cleanup_delay_period>60</cleanup_delay_period> // 检查 DDL 记录清理间隔
<task_max_lifetime>604800</task_max_lifetime> // 已完成节点的最大存活时间
</distributed_ddl>
注意,当超过了队列长度之后,并不会抛出异常。
ASTQueryWithOnCluster 分布式 SQL 父类,对应 ON CLUSTER 语句
|
常见操作
------ 查看集群信息
SELECT
cluster, shard_num, replica_num, host_name, port, is_local
FROM
system.clusters;
TCPHandler::run() 通过 POCO 库建立 Server
|-TCPHandler.runImpl()[Server/TCPHandler.cpp]
|-TCPHandler::receiveHello()/sendHello() 会有个初始化的交互
|-TCPHandler::receivePacket() <STEP#1> 接收报文,建立连接握手成功后,会在一个循环里处理
|-DB::executeQuery() [Interpreters/executeQuery.cpp] 开始处理请求
| |-DB::executeQueryImpl() 真正执行,包括解析生成AST、重写等,通过 Interpreter 构建 Pipeline,返回AST和解析后的结果
| |-DB::parseQuery() <STEP#2> 解析语句生成AST,支持不同的协议的解析
| |-InterpreterFactory::get() <STEP#3> 创建执行的流水线,根据上述的AST创建对应的 Interpreter(解释器),这里会根据不同的 SQL 类型构建对应的对象,一般是通过 make_unique 创建
| | |-InterpreterSelectQuery() 最基础的构造函数实现很复杂,详见与之对应的文件
| | |-checkStackSize()
| | |-InterpreterSelectQuery::initSettings()
| | |-RewriteCountDistinctFunctionVisitor() 开启了 count_distinct_optimization 参数
| | |-RewriteUniqToCountVisitor() 开启了 optimize_uniq_to_count 参数
| | |-InterpreterSelectQuery.analyze() 这里实际上是一个闭包实现
| | |-TreeRewriter::analyzeSelect()
| | | |-TreeRewriter::renameDuplicatedColumns() 对一些别名等进行重命名
| | | |-TreeRewriter::translateQualifiedNames()
| | | |-TreeRewriter::removeUnneededColumnsFromSelectClause() 消除select子句后的冗余列
| | | |-TreeRewriter::executeScalarSubqueries() 执行标量子查询,并且用常量替代子查询结果
| | |-InterpreterSelectQuery::getSampleBlockImpl()
| | |-MergeTreeData::getQueryProcessingStage()
| | |-MergeTreeData::getQueryProcessingStageWithAggregateProjection()
| | |-selectBestProjection()
| | |-MergeTreeDataSelectExecutor::estimateNumMarksToRead()
| | |-ReadFromMergeTree::selectRangesToRead()
| | |-ReadFromMergeTree::selectRangesToReadImpl()
| | |-MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipIndexes()
| |>>>.execute() 根据不同的语法开始执行
| |===InterpreterSelectQuery::execute() 构建查询计划
| | |-InterpreterSelectQuery::buildQueryPlan()
| | | |-InterpreterSelectQuery::executeImpl()
| | | |-InterpreterSelectQuery::executeFetchColumns() 从存储读取数据
| | | | |-IStorage::read() 开始读取,会调用具体的实现,可以查看 public IStorage 的实现类
| | | | |-StorageMergeTree::read()
| | | | |-MergeTreeDataSelectExecutor::read()
| | | | |-MergeTreeDataSelectExecutor::readFromParts()
| | | | |-new ReadFromMergeTree() 通过 make_unique 创建,后续通过 InterpreterSelectWithUnionQuery.execute() 触发
| | | |-InterpreterSelectQuery::executeWhere()
| | | |-InterpreterSelectQuery::executeAggregation()
| | | |-InterpreterSelectQuery::executeDistinct()
| | |-QueryPlan::buildQueryPipeline() 构建 QueryPipeline
| |===InterpreterSelectWithUnionQuery::execute() 构建查询计划
| | |-InterpreterSelectWithUnionQuery::buildQueryPlan() 构建查询计划
| | |-QueryPlan::buildQueryPipeline() 构建 QueryPipeline,以 DFS 方式遍历 QueryPlan,会先调用叶子节点
| | | |-QueryPlan::optimize()
| | | |-ITransformingStep::updatePipeline()
| | | | |-ExpressionStep::transformPipeline() 如果某个节点类型是 ExpressionStep
| | | | |-ExpressionActions() 以 make_shared 方式构建对象
| | | | |-ActionsDAG::compileExpressions() 这里需要开启 USE_EMBEDDED_COMPILER 宏
| | | | |-ActionsDAG::compileFunctions()
| | | | | |-ExpressionJIT::getCompilableDAG() 获取类型为 CompileDAG 的对象
| | | | | |-ExpressionJIT::compile()
| | | | | |-LLVMFunction() 同样会通过 make_shared 创建
| | | | | |-compileFunction(CHJIT, const IFunctionBase) 实现详见JIT/compileFunction.cpp代码
| | | | | |-CHJIT::compileModule(std::function) 这里传入的是函数,负责将Module编译成机器码,并建立一个函数名称与函数地址的映射关系
| | | | | |-CHJIT::createModuleForCompilation() 创建 llvm::Module 对象并配置
| | | | | |-CHJIT::compileFunction(llvm::Module, const IFunctionBase) 调用入参指定的函数,生成 IR 的具体函数在 src/Interpreters/JIT/compileFunction.cpp
| | | | | | |>>>IFunctionBase::compile() 这里会调用该函数,只有 LLVMFunction 实现了该方法
| | | | | | |===LLVMFunction::compile()
| | | | | | |-CompileDAG::compile()
| | | | | | |>>>IFunctionBase::compile() 如果是 CompileType::FUNCTION 类型
| | | | | | |===IFunction::compile()
| | | | | | |-FunctionBinaryArithmetic::compileImpl() 这里会调用操作符的方法
| | | | | | |-PlusImpl::compile() 这里是 plus 方法,这里最终调用的就是LLVM提供的方法
| | | | | |-CHJIT::compileModule(llvm::Module) 在 llvm::Module 基础上封装出 CompiledModule 模块,便于函数调用完成后生成的代码相关内存析构
| | | | | |-CHJIT::runOptimizationPassesOnModule() 设置默认的编译参数
| | | | | |-JITCompiler::compile() 代码生成,结果保存在 llvm::MemoryBuffer 中
| | | | | |-llvm::legacy::PassManager::run()
| | | | |-ActionsDAG::removeUnusedActions()
| | | |-ISourceStep::updatePipeline() 这里就是数据源了
| | | |-ReadFromMergeTree.initializePipeline()
| | | |-Pipe.addSimpleTransform() 这里会在匿名函数中通过make_shared创建如下对象
| | | |-ExpressionTransform() 当执行 a+b*c+5 类似的表达式时,在物理查询计划中就是该步骤,会在初始化列表中调用如下函数
| | | |-ExpressionTransform::transformHeader()
| | | |-ActionsDAG::updateHeader()
| | | |-ActionsDAG::executeActionForHeader() 这里是个函数类型会调用如下方法
| | | |-IExecutableFunction::execute()
| | | |-IExecutableFunction::executeWithoutSparseColumns()
| | | |-IExecutableFunction::executeWithoutLowCardinalityColumns()
| | | |-IExecutableFunction::executeImpl()
| | | |-LLVMExecutableFunction::executeImpl() 这里会执行生成的代码
| | |-QueryPipelineBuilder::getPipeline() 最后生成
| |===InterpreterInsertQuery.execute()[Interpreters/InterpreterInsertQuery.cpp]
| |-InterpreterInsertQuery.getTable() 会从 DB 缓存对象中获取
| |-InterpreterInsertQuery.buildInsertSelectPipeline() 采用 INSERT SELECT 方式
| |-InterpreterInsertQuery.buildInsertPipeline() 只使用 INSERT 这里仅解析该路径
| |-InterpreterInsertQuery.buildPreAndSinkChains()
| |-InterpreterInsertQuery.buildSink()
| |>>>Storage.write()
| |===StorageMergeTree.write() 新建 MergeTreeSink 对象
| |-InterpreterInsertQuery.buildPushingToViewsChain() 物化视图处理
|-TCPHandler::processInsertQuery() 处理 Insert 请求结果
|-TCPHandler::processOrdinaryQueryWithProcessors() 处理普通的并发查询执行结果
|-QueryPipeline::getHeader() 先发送头信息给客户端
|-PullingAsyncPipelineExecutor::pull() 循环获取数据发送给客户端
Server::main()
|-registerStorages()
|-registerStorageMerge() 会返回 StorageMerge
|-registerStorageMergeTree::create()
作为列式数据库,以 Block 为单位读取数据,包括了 `IBlockOutputStream` 和 `IBlockInputStream` 分别负责写入和读取。
https://zhuanlan.zhihu.com/p/590261481 ClickHouse分布式DDL执行原理分析
executeDDLQueryOnCluster() 执行分布式 DDL 语句
|-DDLWorker::enqueueQuery() 将变更信息添加到 ZK 中
|-getDistributedDDLStatus() 监控执行情况,应该是通过 DDLQueryStatusSource 实现,不过每搞懂
DDLWorker::runMainThread() 获取任务信息并执行
|-DDLWorker::processTask()
|-DDLWorker::taskShouldBeExecutedOnLeader() 是否需要在 Leader 上运行
|-DDLWorker::tryExecuteQueryOnLeaderReplica()
|-DDLWorker::tryExecuteQuery()
|-executeQuery() 开始真正执行,此时就是非分布式 DDL 操作了
ReplicatedMergeTreeSinkImpl::consume() 复制表实现
|-MergeTreeData::delayInsertOrThrowIfNeeded() 当写入 Block 过多时会延迟写入
|-ReplicatedMergeTreeSinkImpl::checkQuorumPrecondition() 如果设定了 quorum 提交,会进行一系列检查
|-MergeTreeDataWriter::splitBlockIntoParts()
|-MergeTreeDataWriter::writeTempPart() 写入本地磁盘,包括计算 Checksum
|
|-ReplicatedMergeTreeSinkImpl::finishDelayedChunk()
|-ReplicatedMergeTreeSinkImpl::commitPart()
|-commit_new_part_stage()
| |-ReplicatedMergeTreeSinkImpl::detectConflictInAsyncBlockIDs()
| |-StorageReplicatedMergeTree::allocateBlockNumber()
| |-StorageReplicatedMergeTree::getCommitPartOps() 一般是 GET 请求
| |-MergeTreeData::renameTempPartAndAdd() 将临时目录修改为正式目录
|-resolve_duplicate_stage()
StorageReplicatedMergeTree() 构造函数中会启动相关的线程
|-StorageReplicatedMergeTree::queueUpdatingTask() 定时拉取任务
| |-ReplicatedMergeTreeQueue::pullLogsToQueue() 将任务保存在 ZK 的 queue 队列中
|-StorageReplicatedMergeTree::mutationsUpdatingTask() 定时拉取任务
BackgroundJobsAssignee::threadFunc()
|>>>MergeTreeData::scheduleDataProcessingJob()
|===StorageMergeTree::scheduleDataProcessingJob() Merge 合并
|===StorageReplicatedMergeTree::scheduleDataProcessingJob() 同步主节点数据
| |-StorageReplicatedMergeTree::processQueueEntry()
| |-StorageReplicatedMergeTree::executeLogEntry()
| |-StorageReplicatedMergeTree::executeDropRange() 处理 DROP_RANGE DROP_PART 操作
| |-StorageReplicatedMergeTree::executeReplaceRange() 处理 REPLACE_RANGE 操作
| |-StorageReplicatedMergeTree::executeFetch() 处理 GET_PART 操作
| | |-StorageReplicatedMergeTree::fetchPart() 从其它节点读取分区数据
| | |-MergeTreeData::renameTempPartAndReplace() 将文件写入本地并更新内存 Meta 信息
| | |-StorageReplicatedMergeTree::checkPartChecksumsAndCommit() 获取并提交
| |-StorageReplicatedMergeTree::executeMetadataAlter() 处理 ALTER_METADATA 操作
|-MergeTreeData::scheduleDataMovingJob() 含 TTL、移动 Partition 等操作
ReplicatedMergeTreeQueue
MergeTreeSink::consume() 写入的最终实现
|-MergeTreeDataWriter::splitBlockIntoParts() 将数据按照分区进行划分多个 Block
|-MergeTreeDataWriter::writeTempPart() 遍历所有上述切割的 Block ,并将 Block 写入临时 Part 目录,包括计算 Checksum
| |-MergeTreeDataWriter::writeTempPartImpl()
| |-MinMaxIndex::update() 计算 Block 与 Partition 相关字段的 MinMax 统计值
| |-MergeTreeDataWriter::stableGetPermutation() 根据 SORT BY 字句进行排序
| |-MergeTreeDataWriter::updateTTL() 计算 TTL 相关字段的 MinMax 统计值
| |-MergedBlockOutputStream::writeWithPermutation() 写入 Partition
| | |-MergedBlockOutputStream::writeImpl()
| | |-Block::checkNumberOfRows() 检查各个列的行数是否相同
| | |-MergeTreeDataPartWriterWide::write() 真正写入数据
| | |-MergeTreeDataPartWriterWide::computeIndexGranularity()
| | |-MergeTreeDataPartWriterWide::writeColumn() 对所有字段分批写入
| | | |-MergeTreeDataPartWriterWide::writeSingleGranule() 写入单个 Granule 的数据
| | | |-MergeTreeDataPartWriterWide::flushMarkToFile() 对应 mrk 索引写入
| | |-MergeTreeDataPartWriterWide::calculateAndSerializePrimaryIndex()
| | |-MergeTreeDataPartWriterWide::calculateAndSerializeSkipIndices()
| | |-MergeTreeDataPartWriterWide::calculateAndSerializeStatistics()
| | |-MergeTreeDataPartWriterWide::shiftCurrentMark()
| |-MergedBlockOutputStream::finalizePartAsync() 异步写入
|-MergeTreeDataWriter::finishDelayedChunk()
|-MergeTreeData::renameTempPartAndAdd() 将临时目录挂载到表空间
|-BackgroundJobsAssignee::trigger() 触发后台的任务调度,例如 Merge
IResolvedFunction
|-IFunctionBase
|-FunctionExpression
// src/Functions/plus.cpp
static inline llvm::Value * compile(llvm::IRBuilder<> & b, llvm::Value * left, llvm::Value * right, bool)
{
return left->getType()->isIntegerTy() ? b.CreateAdd(left, right) : b.CreateFAdd(left, right);
}
如何调用已编译的函数
ISourceStep::updatePipeline()
执行的时候第一个参数是记录行数,第二个则是通过 ColumnData.data() 获取的值。
ReadFromMergeTree::read()
ReadFromMergeTree::readFromPoolParallelReplicas()
MergeTreeSelectProcessor() 构造函数
MergeTreeSelectProcessor::getPrewhereActions()
DB::tryBuildPrewhereSteps()
removeDuplicateColumns()
StatementImpl::execute()
|-StatementImpl::compile()
|-StatementImpl::compileImpl()
|-compile()
compileFunction()
LLVMFunction.prepare()
https://zhuanlan.zhihu.com/p/461631180
https://www.ihnfsa.com/database/clickhouse-jit-source-code/
在 CHJIT::compileFunction(llvm::Module, const IFunctionBase) 中会创建 LLVM 函数,
ClientBase::executeMultiQuery()
ClientBase::processParsedSingleQuery()
ClientBase::processOrdinaryQuery()
CHJIT::deleteCompiledModule()
IStorage
|-MergeTreeData
|-StorageMergeTree
CompiledExpressionCacheEntry
|-CompiledAggregateFunctionsHolder
|-CompiledFunctionHolder
|-CompiledSortDescriptionFunctionHolder
参考
- ClickHouse Monitoring 不错的观测 system 元数据信息,还有 QRYN 基于 CK 的观测系统实现。