基于flink和redpanda搭建一个轻型实时es的索引

以下是您提供内容的精确中文翻译,保留所有技术细节与专业术语:


Elasticsearch 是一款可扩展、支持多租户的全文分布式搜索与分析引擎。它构建在Apache Lucene(高性能文本搜索引擎库)之上,采用开源模式。作为基于文档的数据库,Elasticsearch被广泛应用于分析、搜索和监控等场景。

Apache Flink® 是开源的分布式流批处理一体化平台,能够对无界和有界数据流进行无状态或有状态计算。它是构建实时数据管道和应用的强大工具,例如可作为事件驱动型应用中的触发机制——当新数据到达时触发搜索索引更新。Flink还可用于从持续流动的实时数据流中提取信息与洞察。

本教程将演示如何通过FlinkElasticsearchRedpanda(更简单、对开发者友好的Apache Kafka®替代方案)构建闪电般快速的搜索索引。


搜索索引背后的技术解析

在构建高速搜索索引时,性能延迟是关键指标。面对海量动态数据源(如全文搜索应用、分析仪表盘、自动补全功能和告警引擎),快速检索能力至关重要。

  • Flink负责数据转换、丰富与清洗
  • Elasticsearch建立索引实现可搜索性
    这种技术组合使用户能快速高效检索庞大数据集。

要创建面向Elasticsearch的实时流式管道,需优化数据流处理流程以确保高效准确处理。Redpanda为实现高性能数据传输提供理想解决方案:

  • 采用C++构建的轻量级流数据引擎,特别适合计算资源受限的边缘硬件
  • 完全兼容Apache Kafka® API,简化架构复杂度
  • 相较Kafka实现10倍性能提升,延迟波动更小

通过Redpanda与Elasticsearch的集成,可大幅提升Elasticsearch的数据存储与分析速度。


基于Redpanda+Flink+Elasticsearch的搜索索引实现

假设存在一个多服务系统,持续生成包含基础信息的用户活动事件。这些事件需要传输至专为搜索优化的数据存储中,且在入库前需通过元数据丰富事件内容以提升搜索效率。三者的协同工作将构建满足该场景的极速搜索索引。

下图展示了搜索索引系统的数据流水线(注:原文档此处应有架构图,可参考原始链接中的图表):

1
2
3
4
5
graph LR
A[用户活动事件源] --> B[Redpanda 数据流]
B --> C[Flink 流处理引擎]
C --> D[Elasticsearch 索引存储]
D --> E[应用层搜索接口]

流程说明

  1. 事件源将原始数据写入Redpanda流式平台
  2. Flink实时消费数据流,执行去重、字段补充、格式标准化等操作
  3. 处理后的结构化数据批量写入Elasticsearch建立倒排索引
  4. 应用层通过Elasticsearch API实现亚秒级检索响应

image.png

  • Python 产生事件发送到 Redpanda的 topic。
  • Flink 处理数据写入到Redpanda的另一个topic。
  • Elasticsearch 索引第二个Redpanda的 topic并提供查询功能。  All of the code resources for this tutorial can be found in this repository.  ## 前置条件:  1. docker  2. 运行在docker中的redpanda  3. anacoda运行python3.10以上版本  4. 安装jq命令cli。  ### docker运行redpanda程序  通过docker-compose来运行redpanda  CodeSnap_2025-05-21_at_095510.png 接下来准备flink+elastic+sqlclient的文件docker-compose文件 CodeSnap_2025-05-21_at_100052.png

这里需要注意到两个docker-compose文件,是共享一个网络的,都是redpanda-quickstart-one-broker_redpanda_network-,这样才可以保证网络的hostname互通。

我们可以注意到这里使用的镜像是自己构建的,需要添加对应的依赖:

1
2
3
4
5
6
7
8
9
FROM flink:1.18.1-scala_2.12-java11

  

# Download the connector libraries

RUN wget -P /opt/sql-client/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.2.0-1.18/flink-sql-connector-kafka-3.2.0-1.18.jar; \

    wget -P /opt/sql-client/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-json/1.18.1/flink-json-1.18.1.jar;

这里使用的flink版本为1.18.1 启动这个docker-compose文件的命令包含两步,第一步是构建,第二步是运行

1
2
docker-compose build --no-cache
docker-compose up -d

如果这一切正常可以达到如下的效果: image.png

创建topic

使用redpanda的命令来创建topic

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
❯ docker exec -it redpanda-0 rpk cluster info                 
CLUSTER
=======
redpanda.870098f6-50b6-4942-9ad0-5e864aec863a

BROKERS
=======
ID    HOST        PORT
0*    redpanda-0  9092
docker exec -it redpanda-0 \
rpk topic create user_activity user_activity_enriched

TOPIC                   STATUS
user_activity           OK
user_activity_enriched  OK

连接kafka到elasticsearch

Elasticsearch 可以使用 Kafka Connect 和兼容的连接器(例如 Camel Elasticsearch Kafka Sink 连接器)轻松连接到 Redpanda。Kafka Connect 与 Apache Kafka 软件包捆绑在一起。要获取 Kafka Connect,请前往 Apache Kafka 下载页面,然后点击 Scala 2.13 的 Kafka 2.8.0 二进制包下载链接。下载二进制文件后,将其解压到项目根目录。接下来,在项目根目录下创建一个名为 configuration 的文件夹,并在其中创建 connect.properties 文件,内容如下:

CodeSnap_2025-05-21_at_101834.png 请注意,bootstrap.servers 地址设置为 localhost:9092,这与 Redpanda 集群地址相对应。现在需要设置 Camel Elasticsearch Index Sink Kafka 连接器。访问此网页,然后点击 camel-elasticsearch-index-sink-kafka-connector 旁边的下载链接。下载完成后,解压文件,然后将 camel-elasticsearch-index-sink-kafka-connector 文件夹复制到 plugins 目录。要设置 Elasticsearch 的接收器连接器,请在配置目录中创建一个名为 elasticsearch-sink-connector.properties 的文件,并在其中填充以下内容: CodeSnap_2025-05-21_at_102055.png 进入到congfiguration的目录下执行命令:

1
 ../kafka_2.13-3.7.2/bin/connect-standalone.sh connect.properties elasticsearch-sink-connector.properties

输出如下所示:

1
2
3
[2025-05-20 17:34:08,071] INFO [elasticsearch-sink-connector|task-0] [Consumer clientId=connector-consumer-elasticsearch-sink-connector-0, groupId=connect-elasticsearch-sink-connector] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient:997)
[2025-05-21 08:34:02,007] WARN [elasticsearch-sink-connector|task-0] WorkerSinkTask{id=elasticsearch-sink-connector-0} Commit of offsets timed out (org.apache.kafka.connect.runtime.WorkerSinkTask:243)
[2025-05-21 08:35:46,891] WARN [elasticsearch-sink-connector|task-0] WorkerSinkTask{id=elasticsearch-sink-connector-0} Commit of offsets timed out (org.apache.kafka.connect.runtime.WorkerSinkTask:243)

运行producer.py脚本生产数据

安装一下依赖:

1
2
conda activate base
pip install kafka-python

查看一下producer.py

CodeSnap_2025-05-21_at_102622.png 运行生产命令: image.png

1
docker-compose run sql-client

看到如图所示 image.png

sql脚本运行

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31


SET 'pipeline.name' = 'user-activity-processing';


CREATE TABLE useractivity (
    id VARCHAR,
    activity_type VARCHAR,
    ts TIMESTAMP(3)
) WITH (
    'connector' = 'kafka',
    'topic' = 'user_activity',
    'properties.bootstrap.servers' = 'redpanda-0:9092',
    'properties.group.id' = 'test-group',
    'properties.auto.offset.reset' = 'earliest',
    'format' = 'json'
);




CREATE TABLE useractivity_enriched WITH (
    'connector' = 'kafka',
    'topic' = 'user_activity_enriched',
    'properties.bootstrap.servers' = 'redpanda-0:9092',
    'format' = 'json'
) AS
SELECT id,
       Upper(activity_type),
       ts
FROM  useractivity;

注意每一条sql都要单独执行,否则会报错 如果成功启动就可以看到flink的任务启动在flink-web界面: image.png 查看数据是否正常生产到user_activity_enriched这个topic中

1
 docker exec -it redpanda-0 \                                rpk topic consume user_activity_enriched -f '%v\n'

再来检验这个topic的数据是否正常被es缩影到

1
curl 'http://localhost:9200/user_activity_enriched/_search' | jq

如果可以看到如下输出即代表正常 image.png

结论

本文演示了如何结合使用 Redpanda、Flink 和 Elasticsearch 来创建强大的数据管道和搜索应用程序。通过结合使用这些平台,您可以通过 Redpanda 流式传输数据,使用 Flink 执行 ETL 操作,并实时在 Elasticsearch 中索引数据。 参考文档: https://www.redpanda.com/blog/kafka-connect-elasticsearch-data-streaming

Licensed under CC BY-NC-SA 4.0
最后更新于 May 29, 2025 06:06 UTC
comments powered by Disqus
Built with Hugo
主题 StackJimmy 设计
Caret Up