郑文峰的博客 郑文峰的博客
首页
  • python之路
  • go之路
  • 其他
  • redis
  • mysql
  • docker
  • k8s
读书破万卷
周刊
关于
  • 导航 (opens new window)
  • 代码片段 (opens new window)
  • 收藏
  • 友链
  • 外部页面

    • 开往 (opens new window)
  • 索引

    • 分类
    • 标签
    • 归档
GitHub (opens new window)

zhengwenfeng

穷则变,变则通,通则久
首页
  • python之路
  • go之路
  • 其他
  • redis
  • mysql
  • docker
  • k8s
读书破万卷
周刊
关于
  • 导航 (opens new window)
  • 代码片段 (opens new window)
  • 收藏
  • 友链
  • 外部页面

    • 开往 (opens new window)
  • 索引

    • 分类
    • 标签
    • 归档
GitHub (opens new window)
  • python

  • go

  • 其他

    • 分布式锁
    • 使用hue创建ozzie的pyspark action workflow
    • 使用java开发logstash的filter插件
      • 0. 前言
      • 1. 准备开发环境
      • 2. 编写 logstash java filter 插件
        • 2.1 准备官方 demo
        • 2.2 开发 Filter 代码
      • 3. 单元测试
      • 4. 打包部署 Filter 插件
        • 4.1 元数据信息
        • 4.2 打包任务
        • 4.3 安装
      • 5. 验证
      • 6. 相关链接
    • count的性能优化
  • 编程
  • 其他
zhengwenfeng
2022-12-20
目录

使用java开发logstash的filter插件

# 0. 前言

在工作中遇到,logstash 中的 filter 中写了大量的解析逻辑,解析性能遇到瓶颈,所以希望将该部分的逻辑转换成 java 开发的插件,以提高解析速度。

本文主要记录我开发插件的过程。

# 1. 准备开发环境

下载 logstash 源码

直接可以去 logstash github (opens new window) 中选择自己使用的版本进行下载即可。

构建 logstash

将下载的 logstash 压缩包解压出来,进入 logstash 根目录下,当前路径下有 gradlew 和 gradlew.bat 两个脚本文件,前者是在 linux 下执行的,后者是在 windows 执行的脚本。

假设当前环境是 windows,执行 gradlew.bat assemble 命令可以对当前模块进行构建。在这个过程中会去下载所有的依赖包到本地。等待构建完成,直至输出 BUILD SUCCESSFUL 代表构建成功。

gradlew.bat 脚本是对 gradle 的封装,在执行该命令时,会主动根据 gradle/wrapper/ 下的配置去下载 gradle 工具,然后再调用 gradle 进行构建模块

# 2. 编写 logstash java filter 插件

# 2.1 准备官方 demo

下载 java 插件官方模板

将 logstash-filter-java_filter_example (opens new window) 下载到本地使用,自定义开发的插件是基于该 example 进行修改的。

构建插件

在该项目的根目录下,创建 gradle.properties 文件,需要添加变量指定 logstash 下的 logstash-core 目录路径,使用绝对路径即可。

LOGSTASH_CORE_PATH=<target_folder>/logstash-core
1

该变量是给 build.gradle 文件中使用的。

# 2.2 开发 Filter 代码

首先来看官方提供的 demo Filter 代码,代码路径在:src\main\java\org\logstashplugins\JavaFilterExample.java,我们开发的插件基本是按照这个例子进行修改实现的。

  • 设置 pipeline 中的插件名称

首先可以看到有一个注解 @LogstashPlugin(name = "java_filter_example") name 的值是指我们在 pipeline 中填写的插件名称。

  • 在 pipeline 中传参到插件中

通过 PluginConfigSpec.stringSetting 定义变量

public static final PluginConfigSpec<String> SOURCE_CONFIG = PluginConfigSpec.stringSetting("source", "message");
1

再通过在构造方法中调用 get 方法即可获取到传入的值

this.sourceField = config.get(SOURCE_CONFIG);
1

并且需要将新增的字段添加到 configSchema 方法中并返回出去。

@Override
public Collection<PluginConfigSpec<?>> configSchema() {
	// should return a list of all configuration options for this plugin
	return Collections.singletonList(SOURCE_CONFIG);
}
1
2
3
4
5
  • filter 主体编码

该插件的主体是 filter 方法,也就是数据的过滤走的 filter 方法,我们将想要做的解析规则实现在该方法中即可。

可以看到该方法中有一个对 events 遍历的处理,每一个 Event 都是进来的每一条数据,然后对该条数据进行处理转换,最后再将转换好的 events 传出去。

可以看到官方的案例是将传入的 message 字符串翻转。

@Override
public Collection<Event> filter(Collection<Event> events, FilterMatchListener matchListener) {
	for (Event e : events) {
		Object f = e.getField(sourceField);
		if (f instanceof String) {
			e.setField(sourceField, StringUtils.reverse((String)f));
			matchListener.filterMatched(e);
		}
	}

	return events;
}
1
2
3
4
5
6
7
8
9
10
11
12

# 3. 单元测试

单测对插件来说至关重要,插件的规则转换流程、判断逻辑都非常多,各种类型的数据都可能导致插件出错,而插件验证需要编译、打包、安装再测试,流程较长,所以我们可以通过单测来减少以上流程的进行,在单测中就把所有的可能性都验证到,节省大量的时间。并且在后续迭代修改中,可以减少改动引发。

建议可以使用 junit 的参数化单测方式,可以提高单测的效率和数量。这个需要在 build.gradle 文件中的 dependencies 添加支持参数化的库来支持。

# 4. 打包部署 Filter 插件

# 4.1 元数据信息

我们需要在 build.gradle 文件中修改部分的插件元数据信息,像 description、authors 和 email 等字段都可以随意填写,以下字段需要注意:

  • group,需要和包名相同
  • pluginClass,需要和插件 Filter 的类名相同
  • pluginName,需要和 @LogstashPlugin 中的 name 相同

# 4.2 打包任务

通过执行 gradlew.bat gem 进行插件打包任务,最后会在插件根目下生成 .gem 的插件安装包文件。

# 4.3 安装

安装有在线安装和离线安装两种方式。

注意:我们需要去官网下载可以直接使用的 logstash,而不能使用上面自己下载的 logstash 源码。

在线安装

在线安装会去访问 Elastic 的官网,所以需要是在线的环境。

通过执行 logstash/bin 路径下的 logstash-plugin 命令进行安装,等待片刻即可安装成功。

logstash-plugin install /path/javaPlugin.gem
1

离线安装

在某些场景下,环境是不能连接外网的,所以需要使用离线安装的方式。

将生成的 gem 插件压缩到 zip 包中,然后再使用 logstash-plugin 命令进行安装。

logstash-plugin install file:///tmp/plugin.zip
1

# 5. 验证

官方的插件 example 的功能是翻转字符串的功能,所以我们只需要验证该功能即可。

  1. 创建一个 pipeline.conf
input {
    # 输入一个字符串
    generator { message => "Hello world!" count => 1 }
}

filter {
	# 在插件中@LogstashPlugin配置的插件名称
    java_filter_example {}
}

output {
    # 直接打印到控制台
    stdout { }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
  1. 启动 logstash 加载上面的 pipeline.conf
logstash -f pipeline.conf
1

输出如下,可以看到 message 字段中的 Hello world!被翻转了。

{
	"host" => {
		"name" => "4-sip0060"
	},
	"event" => {
		"original" => "Hello world!",
		"sequence" => 0
	},
	"@timestamp" => 2022-12-20T07:27:46.634166300Z,
	"@version" => "1",
	"message" => "!dlrow olleH"
}
1
2
3
4
5
6
7
8
9
10
11
12

# 6. 相关链接

  • How to write a Java filter plugin (opens new window)
#组件#logstash#java
上次更新: 2023/01/15, 15:47:48
使用hue创建ozzie的pyspark action workflow
count的性能优化

← 使用hue创建ozzie的pyspark action workflow count的性能优化→

最近更新
01
django rest_framework 分页
03-20
02
学习周刊-第03期-第09周
03-03
03
学习周刊-第02期-第08周
02-24
更多文章>
Theme by Vdoing | Copyright © 2022-2023 zhengwenfeng | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式