为了实现将Hive或Spark中的RoaringBitmap数据同步到Clickhouse的Bitmap类型,本文深入分析了Clickhouse中Bitmap类型的底层结构以及与Hive/Spark的兼容性问题,并提供了解决方案。
1. Clickhouse的Bitmap结构解析
在Clickhouse中,Bitmap类型通常通过`groupBitmap`聚合函数生成。例如:
SELECT series_id, groupBitmapState(toUInt32(dvid)) AS bitmap_column
FROM dms_pds_flow_interest_dvid_city_day_all
GROUP BY series_id;
上述SQL的核心在于`groupBitmapState`函数,其底层由C++实现,具体位于`AggregateFunctionGroupBitmap.cpp`文件中。核心逻辑通过`createAggregateFunctionBitmap`创建了一个名为`AggregateFunctionGroupBitmapData`的类实例。
`AggregateFunctionGroupBitmapData`的内部结构如下:
- **高16位和低16位存储模型**:Clickhouse使用`roaring_array_t`结构体来管理Bitmap数据。每个Bitmap被划分为多个桶(bucket),每个桶对应一个高16位值,而低16位则存储在对应的容器中。
2. Hive/Spark中的RoaringBitmap实现
Hive中的Bitmap生成主要依赖自定义UDAF(User Defined Aggregation Function)。关键方法包括:
- `terminatePartial`:返回部分聚合结果。
- `merge`:合并两个部分结果。
- `terminate`:输出最终结果。
以Hive为例,其Bitmap最终以Java二进制数组形式返回。因此,要实现Hive与Clickhouse的Bitmap兼容,需解决如何将Hive的二进制数组有效转换为Clickhouse支持的格式。
3. Clickhouse的Bitmap存储机制
Clickhouse中的Bitmap数据存储分为两种模式:
- **SmallSet模式**:当基数小于等于32时,直接存储原始数据。
- **RoaringBitmap模式**:当基数超过32时,转换为压缩格式存储。
以下是关键代码片段:
void add(T value) {
if (isSmall()) {
if (!small.full()) {
small.insert(value);
} else {
toLarge();
rb->add(static_cast(value));
}
} else {
rb->add(static_cast(value));
}
}
写入序列化数据时,遵循以下格式:
1. 写入标识符(`UInt8`类型),`0`表示SmallSet,`1`表示RoaringBitmap。
2. 若为RoaringBitmap,先写入序列化字节大小(`VarInt`编码)。
3. 写入实际的字节数组。
4. Java实现Bitmap序列化
以下是基于Java的Bitmap序列化示例代码:
import org.roaringbitmap.RoaringBitmap;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.nio.ByteBuffer;
import java.util.Base64;
public class BitmapSerializer {
public static String serialize(RoaringBitmap bitmap) {
if (bitmap.getCardinality() <= 32) {
ByteBuffer buffer = ByteBuffer.allocate(4 * bitmap.getCardinality() + 2);
buffer.put((byte) 0); // SmallSet标识
buffer.put((byte) bitmap.getCardinality());
for (int value : bitmap.toArray()) {
buffer.putInt(value);
}
return Base64.getEncoder().encodeToString(buffer.array());
} else {
ByteArrayOutputStream byteArrayStream = new ByteArrayOutputStream();
try (DataOutputStream dataStream = new DataOutputStream(byteArrayStream)) {
bitmap.serialize(dataStream);
byte[] serializedBitmap = byteArrayStream.toByteArray();
ByteBuffer buffer = ByteBuffer.allocate(serializedBitmap.length + 5);
buffer.put((byte) 1); // RoaringBitmap标识
writeVarInt(serializedBitmap.length, buffer);
buffer.put(serializedBitmap);
return Base64.getEncoder().encodeToString(buffer.array());
} catch (Exception e) {
throw new RuntimeException("Serialization failed", e);
}
}
}
private static void writeVarInt(int value, ByteBuffer buffer) {
while ((value & ~0x7F) != 0) {
buffer.put((byte) ((value & 0x7F) | 0x80));
value >>>= 7;
}
buffer.put((byte) value);
}
}
5. Spark批量写入Clickhouse
通过Spark SQL可批量处理RoaringBitmap数据并写入Clickhouse。具体步骤如下:
1. 使用自定义UDF对Bitmap数据进行序列化。
2. 将序列化后的字符串插入Clickhouse表。
3. 利用Clickhouse的物化视图功能自动解码并存储为Bitmap类型。
示例代码参考:[https://github.com/niutaofan/spark_bitmap.git](https://github.com/niutaofan/spark_bitmap.git)