doris调研

doris 是由百度贡献 apache 的 mpp 数据库

地址:https://github.com/apache/doris

image-20230612163148325

docker 启动

1
2
cd docker/runtime/docker-compose-demo/example/mysql-flink-doris/
docker compose up -d

启动报错过程中遇到的坑

1
2
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: The MySQL server has a timezone offset (0 seconds ahead of UTC) which does not match the configured timezone Asia/Shanghai. Specify the right server-time-zone to avoid inconsistencies for time-related fields.

answer 是 mysql 的默认时区是 UTC 时区,需要进行修改,如何查看所在的时区:

1
show variables like '%time_zone%'

image-20230613142437785

永久修改需要再 my.cnf 的[mysqld]后追加:

1
default-time-zone=’+08:00’

参考文档:

https://juejin.cn/post/7229987304125923383

image-20230704150502652

插入测试数据

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

CREATE DATABASE test_inverted_index;

USE test_inverted_index;

-- 创建表的同时创建了comment的倒排索引idx_comment
--   USING INVERTED 指定索引类型是倒排索引
--   PROPERTIES("parser" = "english") 指定采用english分词,还支持"chinese"中文分词,如果不指定"parser"参数表示不分词
CREATE TABLE hackernews_1m
(
    `id` BIGINT,
    `deleted` TINYINT,
    `type` String,
    `author` String,
    `timestamp` DateTimeV2,
    `comment` String,
    `dead` TINYINT,
    `parent` BIGINT,
    `poll` BIGINT,
    `children` Array<BIGINT>,
    `url` String,
    `score` INT,
    `title` String,
    `parts` Array<INT>,
    `descendants` INT,
    INDEX idx_comment (`comment`) USING INVERTED PROPERTIES("parser" = "english") COMMENT 'inverted index for comment'
)
DUPLICATE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 10
PROPERTIES ("replication_num" = "1");

t	CREATE TABLE `stream_load_test` ( `order_number` varchar(160) NOT NULL COMMENT '订单号', `canal_type` varchar(96) NULL ) ENGINE=OLAP UNIQUE KEY(`order_number`) COMMENT 'OLAP' DISTRIBUTED BY HASH(`order_number`) BUCKETS 6 PROPERTIES ( "replication_allocation" = "tag.location.default: 1", "storage_format" = "V2", "light_schema_change" = "true", "disable_auto_compaction" = "false" );

导入数据,通过 stream load 导入数据

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

wget https://doris-build-1308700295.cos.ap-beijing.myqcloud.com/regression/index/hacknernews_1m.csv.gz

curl --location-trusted -u root: -H "compress_type:gz" -T hacknernews_1m.csv.gz  http://localhost:8030/api/test_inverted_index/hackernews_1m/_stream_load
{
    "TxnId": 2,
    "Label": "a8a3e802-2329-49e8-912b-04c800a461a6",
    "TwoPhaseCommit": "false",
    "Status": "Success",
    "Message": "OK",
    "NumberTotalRows": 1000000,
    "NumberLoadedRows": 1000000,
    "NumberFilteredRows": 0,
    "NumberUnselectedRows": 0,
    "LoadBytes": 130618406,
    "LoadTimeMs": 8988,
    "BeginTxnTimeMs": 23,
    "StreamLoadPutTimeMs": 113,
    "ReadDataTimeMs": 4788,
    "WriteDataTimeMs": 8811,
    "CommitAndPublishTimeMs": 38
}
报错发现:
 curl --location-trusted -u root: -H "compress_type:gz" -T hacknernews_1m.csv.gz  http://10.7.20.42:32130/api/test_inverted_index/hackernews_1m/_stream_load
{
    "TxnId": 2,
    "Label": "2afbc117-1e66-4d87-a2df-8249b8fdfbf9",
    "TwoPhaseCommit": "false",
    "Status": "Fail",
    "Message": "[ANALYSIS_ERROR]errCode = 2, detailMessage = transaction [2] is already aborted. abort reason: coordinate BE is down",
    "NumberTotalRows": 1000000,
    "NumberLoadedRows": 1000000,
    "NumberFilteredRows": 0,
    "NumberUnselectedRows": 0,
    "LoadBytes": 130618406,
    "LoadTimeMs": 28038,
    "BeginTxnTimeMs": 25,
    "StreamLoadPutTimeMs": 218,
    "ReadDataTimeMs": 4530,
    "WriteDataTimeMs": 27761,
    "CommitAndPublishTimeMs": 0
}
检查doris的log 可以看到时因为doris 的be超时,
再去查看be的log,发现是doris的warter-mark的作用。

在 sql 窗口查询

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
MySQL [(none)]> USE test_inverted_index;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Database changed
MySQL [test_inverted_index]> SELECT count() FROM hackernews_1m;
+---------+
| count() |
+---------+
| 1000000 |
+---------+
1 row in set (0.312 sec)

MySQL [test_inverted_index]> SELECT count() FROM hackernews_1m WHERE comment LIKE '%OLAP%';
+---------+
| count() |
+---------+
|      34 |
+---------+
1 row in set (0.254 sec)

MySQL [test_inverted_index]> SELECT count() FROM hackernews_1m WHERE comment MATCH_ANY 'OLAP';
+---------+
| count() |
+---------+
|      35 |
+---------+
1 row in set (0.033 sec)

MySQL [test_inverted_index]> SELECT count() FROM hackernews_1m WHERE comment LIKE '%OLTP%';
+---------+
| count() |
+---------+
|      48 |
+---------+
1 row in set (0.072 sec)

MySQL [test_inverted_index]>  SELECT count() FROM hackernews_1m WHERE comment MATCH_ANY 'OLTP';
+---------+
| count() |
+---------+
|      51 |
+---------+
1 row in set (0.018 sec)

MySQL [test_inverted_index]> SELECT count() FROM hackernews_1m WHERE author = 'faster';
+---------+
| count() |
+---------+
|      20 |
+---------+
1 row in set (0.101 sec)

MySQL [test_inverted_index]>
1
2
3
4
5
6
7
{"user_id": "1785543", "item_id":"456", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "1786543", "item_id":"456", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "1796543", "item_id":"456", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "1796553", "item_id":"456", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "1796743", "item_id":"456", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "2", "item_id":"456", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "5", "item_id":"12", "category_id": "1", "behavior": "pv1", "ts": "2023-6-26T01:00:00Z"}

创建 kafkatable

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

CREATE TABLE user_log (
    user_id VARCHAR,
    item_id VARCHAR,
    category_id VARCHAR,
    behavior VARCHAR,
    ts varchar
) WITH (
    'connector' = 'kafka',
    'topic' = 'user_behavior2',
    'properties.bootstrap.servers' = '10.7.20.26:30001',
    'format' = 'json',
    'scan.startup.mode' = 'earliest-offset'

);
SET 'execution.checkpointing.interval' = '10s';

CREATE TABLE doris_test_sink_1 (
                    user_id VARCHAR,
                 item_id VARCHAR,
                  category_id VARCHAR,
                behavior VARCHAR,
                  ts varchar
                )
                WITH (
                  'connector' = 'doris',
                 'fenodes' = '192.168.120.181:8030',
                  'table.identifier' = 'test_inverted_index.user_log',
                  'username' = 'root',
                  'sink.enable-delete' = 'false',
                  'password' = ''
                );
            insert into doris_test_sink_1 select * from user_log;

doris 发送请求

1
curl -X PUT --location-trusted -u root:''  -H "txn_id:45077" -H "txn_operation:abort"  http://172.20.80.3:8040/api/db/t1/_stream_load_2pc

flink 代码问题解决:

在处理 flink 写入数据到 doris 碰到,作业一直无法成功:

解决问题办法:

1 dependency 问题解决,解决冲突代码。

2 打印日志,从日志上排查问题。

3.httpclinet 4.5.2 在 put 方法时会造成 put 请求 400,踩坑半天才解决。

1
    "Message": "[INTERNAL_ERROR]cancelled: [MEM_LIMIT_EXCEEDED]Process has no memory available, cancel top memory usage load: load memory tracker <Load#Id=fa46af10c2cad8c2-3fdbb3e9d2f8b39f> consumption 235.49 KB, backend 172.20.80.3 process memory used 530.98 MB exceed limit 15.81 GB or sys mem available 1.55 GB less than low water mark 1.60 GB. Execute again after enough memory, details see be.INFO.",
Licensed under CC BY-NC-SA 4.0
最后更新于 Jan 06, 2025 05:52 UTC
comments powered by Disqus
Built with Hugo
主题 StackJimmy 设计
Caret Up