doris使用minio来处理数据

使用 doris+minio

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
version: "3"
services:
  mysql:
    image: mysql:8.0.31
    volumes:
      - /data/mysql/:/var/lib/mysql
    environment:
      MYSQL_ROOT_PASSWORD: 123456
      MYSQL_DATABASE: doris-test
      MYSQL_PASSWORD: 123456
      TZ: Asia/Shanghai # 时区配置亚洲上海,解决了容器的时区问题!!!
    command:
      --character-set-server=utf8
    hostname: mysql
    ports:
      - "3306:3306"

  # jobmanager:
  #   image: apache/flink:1.15-scala_2.12
  #   volumes:
  #     - /data/flink/job/flink-doris-connector-1.15-1.2.1.jar:/opt/flink/lib/flink-doris-connector-1.15-1.2.1.jar
  #     - /data/flink/job/flink-sql-connector-mysql-cdc-2.2.1.jar:/opt/flink/lib/flink-sql-connector-mysql-cdc-2.2.1.jar
  #   expose:
  #     - "6123"
  #   ports:
  #     - "8081:8081"
  #   command: jobmanager
  #   environment:
  #     - JOB_MANAGER_RPC_ADDRESS=jobmanager

  # taskmanager:
  #   image: apache/flink:1.15-scala_2.12
  #   volumes:
  #     - /data/flink/task/flink-doris-connector-1.15-1.2.1.jar:/opt/flink/lib/flink-doris-connector-1.15-1.2.1.jar
  #     - /data/flink/task/flink-sql-connector-mysql-cdc-2.2.1.jar:/opt/flink/lib/flink-sql-connector-mysql-cdc-2.2.1.jar
  #   expose:
  #     - "6121"
  #     - "6122"
  #   depends_on:
  #     - jobmanager
  #   command: taskmanager
  #   links:
  #     - "jobmanager:jobmanager"
  #   environment:
  #     - JOB_MANAGER_RPC_ADDRESS=jobmanager

  fe:
    image: apache-doris:2.0.0-fe-hxf
    volumes:
      - /data/apache-doris/fe/doris-meta/:/opt/apache-doris/fe/doris-meta
      - /data/apache-doris/fe/log/:/opt/apache-doris/fe/log
      - ./fe.conf:/opt/apache-doris/fe/conf/fe.conf
    environment:
      - FE_SERVERS=fe1:172.20.80.2:9010
      - FE_ID=1
      - TZ=Asia/Shanghai # 时区配置亚洲上海,解决了容器的时区问题!!!
    restart: on-failure
    ports:
      - "8030:8030"
      - "9030:9030"
    networks:
      doris_net:
      # 172.20.80.1 提示address is in use
        ipv4_address: 172.20.80.2
  zookeeper:
   image: 'bitnami/zookeeper:3.4.12-r68'
   hostname: zookeeper
   container_name: zookeeper
   ports:
     - "2181:2181"
   environment:
     - ALLOW_ANONYMOUS_LOGIN=yes
   networks:
      doris_net:
        ipv4_address: 172.20.80.4

  kafka:
   image: 'bitnami/kafka:2.0.0'
   hostname: kafkabroker
   container_name: kafkabroker
   ports:
     - "9092:9092"
   environment:
     - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
     - ALLOW_PLAINTEXT_LISTENER=yes
   networks:
      doris_net:
        ipv4_address: 172.20.80.5
  be:
    image: apache-doris:2.0.0-be-hxf
    ulimits:
      nproc: 2000000
      nofile:
        soft: 65536
        hard: 65536
    volumes:
      - /data/apache-doris/be/storage/:/opt/apache-doris/be/storage
      - /data/apache-doris/be/log/:/opt/apache-doris/be/log
    environment:
      - FE_SERVERS=fe1:172.20.80.2:9010
      - BE_ADDR=172.20.80.3:9050
      - TZ=Asia/Shanghai # 时区配置亚洲上海,解决了容器的时区问题!!!
    depends_on:
      - fe
    restart: on-failure
    networks:
      doris_net:
        ipv4_address: 172.20.80.3
  doris--minio:
    image: minio/minio
    container_name: doris--minio
    hostname: doris--minio
    environment:
      - MINIO_ROOT_USER=admin
      - MINIO_ROOT_PASSWORD=password
      - MINIO_REGION=xian
    ports:
      - 9001:9001
      - 9000:9000
    networks:
      doris_net:
        ipv4_address: 172.20.80.6
    command: ["server", "/data", "--console-address", ":9001"]
  doris--mc:
    depends_on:
      - doris--minio
    image: minio/mc
    container_name: doris--mc
    environment:
      - AWS_ACCESS_KEY_ID=admin
      - AWS_SECRET_ACCESS_KEY=password
      - AWS_REGION=xian
    networks:
      doris_net:
        ipv4_address: 172.20.80.7
    entrypoint: >
      /bin/sh -c "
      until (/usr/bin/mc config host add minio http://doris--minio:9000 admin password) do echo '...waiting...' && sleep 1; done;
      /usr/bin/mc rm -r --force minio/warehouse;
       /usr/bin/mc rm -r --force minio/test;
        /usr/bin/mc rm -r --force minio/log;
      /usr/bin/mc mb minio/warehouse;
      /usr/bin/mc mb minio/test;
      /usr/bin/mc mb minio/log;
      /usr/bin/mc policy set public minio/warehouse;
      /usr/bin/mc policy set public minio/test;
      /usr/bin/mc policy set public minio/log;

      exit 0;
      "
networks:
  doris_net:
    ipam:
      config:
        - subnet: 172.20.80.0/24

image-20230810113107652

通过 doris-mc 可以创建 bucket

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
 mysql -uroot -h 172.20.80.2 -P9030
Welcome to the MariaDB monitor.  Commands end with ; or \g.
Your MySQL connection id is 102
Server version: 5.7.99 Doris version doris-2.0.0-alpha1-Unknown

Copyright (c) 2000, 2018, Oracle, MariaDB Corporation Ab and others.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

MySQL [(none)]> CREATE RESOURCE "remote_s3"
    -> PROPERTIES
    -> (
    ->     "type" = "s3",
    ->     "AWS_ENDPOINT" = "localhost:9000",
    ->     "AWS_REGION" = "xian",
    ->     "AWS_BUCKET" = "test",
    ->     "AWS_ROOT_PATH" = "/test/test001",
    ->     "AWS_ACCESS_KEY" = "minioadmin",
    ->     "AWS_SECRET_KEY" = "minioadmin",
    ->     "AWS_MAX_CONNECTIONS" = "50",
    ->     "AWS_REQUEST_TIMEOUT_MS" = "3000",
    ->     "AWS_CONNECTION_TIMEOUT_MS" = "1000"
    -> );
ERROR 1105 (HY000): Unexpected exception: Unable to execute HTTP request: Unexpected end of file from server
MySQL [(none)]>
这个报错是因为test.localhost 他是这样去访问的,所以需要添加

创建 resource

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
CREATE RESOURCE "remote_s3"
PROPERTIES
(
   "type" = "s3",
   "AWS_ENDPOINT" = "doris-minio:9000",
    "AWS_REGION" = "xian",
    "AWS_BUCKET" = "test",
    "AWS_ROOT_PATH" = "/test/test001",
    "AWS_ACCESS_KEY" = "admin",
    "AWS_SECRET_KEY" = "password",
    "AWS_MAX_CONNECTIONS" = "50",
    "AWS_REQUEST_TIMEOUT_MS" = "3000",
    "AWS_CONNECTION_TIMEOUT_MS" = "1000"
);
# 在docker的容器中添加dns
docker exec -it e6037735bc10931c782921924f35b010b71c9843d58f9a6a194e420c7809064b bash

root@e6037735bc10:/# echo "172.20.80.6 test.doris-minio">>/etc/hosts
root@e6037735bc10:/#
#一个小时后迁移数据。
CREATE STORAGE POLICY test_policy
PROPERTIES(
    "storage_resource" = "remote_s3",
    "cooldown_ttl" = "1h"
);
CREATE TABLE IF NOT EXISTS create_table_use_created_policy
(
    k1 BIGINT,
    k2 LARGEINT,
    v1 VARCHAR(2048)
)
UNIQUE KEY(k1)
DISTRIBUTED BY HASH (k1) BUCKETS 1
PROPERTIES(
    "storage_policy" = "test_policy",
    "replication_num" = "1"
);
#插入数据
 insert into create_table_use_created_policy values (10001,100001,'11');
 insert into create_table_use_created_policy values (10002,100001,'11');
 insert into create_table_use_created_policy values (10003,100001,'11');

一个小时后查看,在 minio 上果然有存储过来的数据。

image-20230810170110347

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
CREATE RESOURCE "remote_s3"
PROPERTIES
(
   "type" = "s3",
   "AWS_ENDPOINT" = "doris-minio",
    "AWS_REGION" = "xian",
    "AWS_ACCESS_KEY" = "minio",
    "AWS_SECRET_KEY" = "minio123",
    "AWS_MAX_CONNECTIONS" = "50",
    "AWS_BUCKET" = "test",
    "AWS_REQUEST_TIMEOUT_MS" = "3000",
    "AWS_CONNECTION_TIMEOUT_MS" = "1000"
);
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
CREATE REPOSITORY `minio`
WITH S3
ON LOCATION "s3://test"
PROPERTIES
(
   "AWS_ENDPOINT" = "http://doris-minio:9000",
   "AWS_ACCESS_KEY" = "minio",
   "AWS_SECRET_KEY"="minio123",
   "AWS_REGION" = "xian"
);

CREATE REPOSITORY `s3_repo`
WITH S3
ON LOCATION "s3://test"
PROPERTIES
(
    "s3.endpoint" = "http://doris-minio:9000",
    "s3.access_key" = "minio",
    "s3.secret_key"="minio123",
    "s3.region" = "xian",
    "use_path_style" = "true",
    "AWS_REGION" = "xian"
);
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
创建biao
CREATE TABLE IF NOT EXISTS demo.example_tbl
(
    `user_id` LARGEINT NOT NULL COMMENT "用户id",
    `date` DATEV2 NOT NULL COMMENT "数据灌入日期时间",
    `city` VARCHAR(20) COMMENT "用户所在城市",
    `age` SMALLINT COMMENT "用户年龄",
    `sex` TINYINT COMMENT "用户性别",
    `last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "用户最后一次访问时间",
    `cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费",
    `max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间",
    `min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间"
)
AGGREGATE KEY(`user_id`, `date`, `city`, `age`, `sex`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
PROPERTIES (
    "replication_allocation" = "tag.location.default: 1"
);

curl  --location-trusted -u root: -T test.csv -H "expect:100-continue"  -H "column_separator:," http://127.0.0.1:8030/api/demo/example_tbl/_stream_load
select * from example_tbl;

给 doris 设置密码

1
SET PASSWORD FOR 'root' = PASSWORD('Password123@doris');

[1] 查看与修改内存

1
2
$ SHOW VARIABLES LIKE "%mem_limit%";          // 查看内存限制,默认为2GB
$ set global exec_mem_limit = 8589934592;

[2] 查看与修改超时时间

修改超时时间:Doris 修改超时时间-官方文档(opens new window)

1
2
$ SHOW VARIABLES LIKE "%query_timeout%";      // 查看超时时间,默认为300s
$ set global query_timeout = 6000;            // 修改超时时间为6000s

4. Doris 集群的基本使用

Doris 由于兼容 MySQL 协议,基本使用与 MySQL 差不多,可以使用 SQL 语句进行查询,但也有一些地方不同。

  • 虽然查询操作与 SQL 语句类似,但操作数据表的语句并不兼容,一些内容也不支持修改,因此使用 Navicat 进行操作的时候会看到一些报错。
  • Doris 的数据模型主要分为 3 类:Aggregate、Unique、Duplicate,详见:数据模型-Doris 官方文档
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// 查询当前有哪些任务
show routine Load;

// 停止指定任务
STOP ROUTINE LOAD FOR your_db.your_job;

// 暂停指定任务
PAUSE ROUTINE LOAD FOR your_db.your_job;

// 暂停所有任务
PAUSE ALL ROUTINE LOAD;

// 重启指定任务
RESUME ROUTINE LOAD FOR your_db.your_job;

// 重启所有任务
RESUME ALL ROUTINE LOAD;

// 创建任务
CREATE ROUTINE LOAD your_db.your_job ON your_table COLUMNS (
	FIELD1,
	FIELD2,
	FIELD3
) PROPERTIES (
	"max_batch_interval" = "10",
	"max_batch_rows" = "100000000",
	"max_batch_size" = "524288000",
	"strict_mode" = "false",
	"format" = "json"
)
FROM
	KAFKA (
		"kafka_broker_list" = "kafka_ip:port",
		"kafka_topic" = "your_topic",
		"kafka_partitions" = "0",
		"kafka_offsets" = "0"
	)

4.3 Doris 数据查询调优

#4.3.1 查看 Doris 集群的数据量

由于 Doris 里的数据量太过于庞大,使用 count(1)的方式查询会超时,Doris 官方提供了查询数据量的方法。详见:SHOW-DATA(opens new window)

  • 功能:该语句用于展示数据量、副本数量以及统计行数。
  • 语法:SHOW DATA [FROM db_name[.table_name]] [ORDER BY ...];
  • 实例:SHOW DATA FROM example_db.my_table;
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
这里是生产环境真实数据表查询返回的数据,只不过这里为了脱敏,将数据库名和表名改了。

+-----------+-----------+-----------+--------------+-----------------+
| TableName | IndexName | Size      | ReplicaCount | RowCount        |
+-----------+-----------+-----------+--------------+-----------------+
| my_table  | my_table  | 231.382GB | 4            | 233246375487    |
|           | Total     | 231.382GB | 4            | 233246375487    |
+-----------+-----------+-----------+--------------+-----------------+

可以看到Doris的数据压缩比是非常恐怖的,2332亿数据仅用了231.382GB存储空间。

4.3.2 数据查询耗时分析

需求情景:Doris 库里存储了 2332 亿数据,想要按条件将其全部取出用于分析,查询超时。

查询耗时分析:Doris 耗时分析-官方文档(opens new window)

1
$ set enable_profile=true;

1

然后登录 FE 的管理界面,选择到 QueryProfile 子菜单,可以看到该设置之后的 SQL 语句查询耗时。

Doris查询耗时分析

解决方案:

  • Doris 库表优化:通过添加索引、物化视图等方式去优化查询速度。
  • 查询方式优化:分批次去查询,将查询结果拼接起来,类似于 ES 的滚动查询。

#4.3.3 添加 Bitmap 索引

bitmap index 即位图索引,是一种快速数据结构,能够加快查询速度。

基本语法:

1
2
3
4
5
6
// 创建Bitmap索引
CREATE INDEX index_name ON table_name (siteid) USING BITMAP;
// 查看Bitmap索引(创建索引的过程是异步的,需要等待一段时间才可以查询到)
SHOW INDEX FROM example_db.table_name;
// 删除Bitmap索引
DROP INDEX index_name ON example_db.table_name;

注意事项:

  • bitmap 索引仅在单列上创建。
  • bitmap 索引能够应用在 DuplicateUniq 数据模型的所有列和 Aggregate模型的 key 列上。
  • bitmap 索引支持的数据类型如下:TINYINTSMALLINT、INTBIGINTCHARVARCHARDATEDATETIMELARGEINTDECIMALBOOL
  • bitmap 索引仅在 Segment V2 下生效。当创建 index 时,表的存储格式将默认转换为 V2 格式。

#4.3.4 添加 BloomFilter 索引

BloomFilter 是由 Bloom 在 1970 年提出的一种多哈希函数映射的快速查找算法。通常应用在一些需要快速判断某个元素是否属于集合,但是并不严格要求 100%正确的场合,BloomFilter 有以下特点:

  • 空间效率高的概率型数据结构,用来检查一个元素是否在一个集合中。
  • 对于一个元素检测是否存在的调用,BloomFilter 会告诉调用者两个结果之一:可能存在或者一定不存在。
  • 缺点是存在误判,告诉你可能存在,不一定真实存在。

布隆过滤器实际上是由一个超长的二进制位数组和一系列的哈希函数组成。二进制位数组初始全部为 0,当给定一个待查询的元素时,这个元素会被一系列哈希函数计算映射出一系列的值,所有的值在位数组的偏移量处置为 1。

下图所示出一个 m=18, k=3 (m 是该 Bit 数组的大小,k 是 Hash 函数的个数)的 Bloom Filter 示例。集合中的 x、y、z 三个元素通过 3 个不同的哈希函数散列到位数组中。当查询元素 w 时,通过 Hash 函数计算之后因为有一个比特为 0,因此 w 不在该集合中。

那么怎么判断某个元素是否在集合中呢?同样是这个元素经过哈希函数计算后得到所有的偏移位置,若这些位置全都为 1,则判断这个元素在这个集合中,若有一个不为 1,则判断这个元素不在这个集合中。

Bloom_filter

基本语法:

1
2
3
4
5
6
7
8
// 创建及修改BloomFilter索引
ALTER TABLE example_db.table_name SET ("bloom_filter_columns" = "column1,column2");

// 查看BloomFilter索引(创建索引的过程是异步的,需要等待一段时间才可以查询到)
SHOW CREATE TABLE table_name;

// 删除BloomFilter索引
ALTER TABLE example_db.table_name SET ("bloom_filter_columns" = "");

使用场景:

  • 首先 BloomFilter 适用于非前缀过滤。
  • 查询会根据该列高频过滤,而且查询条件大多是 in 和 = 过滤。
  • 不同于 Bitmap, BloomFilter 适用于高基数列。比如 UserID。因为如果创建在低基数的列上,比如 “性别” 列,则每个 Block 几乎都会包含所有取值,导致 BloomFilter 索引失去意义。

注意事项:

  • 不支持对 Tinyint、Float、Double 类型的列建 Bloom Filter 索引。
  • Bloom Filter 索引只对 in 和 = 过滤查询有加速效果。
  • 如果要查看某个查询是否命中了 Bloom Filter 索引,可以通过查询的 Profile 信息查看。

#4.3.5 Doris 数据查询存在的坑

[1] limit 数据查询需要指定 order by 才能使用

  • 问题描述:使用 limit offset,size 需手动指定 order by 才能使用,不兼容 mysql 语法。加上 order by 之后由于先在大数据上执行 order by 语句,会导致超时问题。
  • 报错信息:1105, 'errCode = 2, detailMessage = OFFSET requires an ORDER BY clause: LIMIT 100000, 100000'
  • 解决办法:不使用 limit offset,size 方式限制结果数量,而是根据 id 这种自增主键列去筛选。

[2] order by 默认会自带隐藏的 limit 65535

  • 问题描述:添加 order by 后,默认被加上了一个隐藏的 limit 65535,导致没有查询出全部结果。
  • 解决办法:在 order by 语句后面手动指定 limit,就会覆盖掉这个 65535 的默认限制。

[3] 一次性查询大量数据把 doris 服务给查崩了

  • 报错信息:have no queryable replicas. err: 88525’s backend 10015 does not exist or not alive
  • 解决办法:重启 doris 服务节点即可。

#5. 参考资料

[1] Doris 基本介绍 from Doris 官方文档(opens new window)

[2] DataX doriswriter from Doris 官方文档(opens new window)

[3] Apache doris 架构及组件介绍 from 知乎(opens new window)

[4] Python 连接 Doirs 查询实战 from CSDN(opens new window)

[5] Apache Doris (Incubating) 原理与实践 from InfoQ(opens new window)

[6] Doris 学习笔记之查询 from CSDN(opens new window)

[7] Bitmap 索引 from Doris 官方文档(opens new window)

[8] BloomFilter 索引 from Doris 官方文档(opens new window)

[9] support limit offset, size use default order from Github issues(opens new window)

[10] OFFSET requires an ORDER BY clause from Github issues(opens new window)

[11] 添加 order by 后,莫名被加上了一个默认的 limit 65535 from Github issues(opens new window)

[12] have no queryable replicas. err: 15606’s backend 10002 does not exist or not alive from Github issues(opens new window)

[13] 负载均衡 from Doris 官方文档

[14] Apache Doris 2.0 冷热分离快速体验

[15] doris 集群介绍

[16]自建 minio 实现 doris 的快速备份与恢复

Licensed under CC BY-NC-SA 4.0
最后更新于 Jan 06, 2025 05:52 UTC
comments powered by Disqus
Built with Hugo
主题 StackJimmy 设计
Caret Up