在使用Logstash传输数据时,在某些业务使用场景中,您可能需要切分源端数据并提取到字段中再写入目标端Elasticsearch集群。例如,源端Logs日志中存在以竖线(
|
)分隔的数据,此时您可以通过Logstash按照
|
切分数据并提取到字段中,再输出到目标端Elasticsearch集群。本文介绍如何通过Logstash切分数据并提取到字段中。
背景信息
logstash-filter-mutate插件(过滤器插件)支持对事件中的字段进行切分、重命名、删除、替换和修改等操作,详细信息请参见
Mutate filter plugin
。所有的过滤器插件都支持以下常见的可选配置项,详细信息请参见
Common Option
。
配置项 | 输入类型 |
---|---|
add_field | hash |
add_tag | array |
enable_metric | boolean |
id | string |
periodic_flush | boolean |
remove_field | array |
remove_tag | array |
前提条件
您已完成以下操作:
-
创建阿里云Elasticsearch实例。
具体操作,请参见 创建阿里云Elasticsearch实例 ,本文以7.10版本实例为例。
- 开启目标Elasticsearch实例的自动创建索引功能。
-
创建阿里云Logstash实例,需要与Elasticsearch实例在同一专有网络下。
具体操作,请参见 创建阿里云Logstash实例 。
-
准备测试数据。
本文以Beats采集的Logs中的某一条数据为例,关于Beats采集数据的详细信息请参见 采集ECS服务日志 。如下测试数据中的 LogMessage 以
|
分隔,并存在多个||||
特殊符号。本文使用Logstash按照|
切分数据,被切分的字段分别写入到对应的字段中:mobile、appName、type、timestamp、status、code、component、cid、serviceId、serviceName、serviceType、param,最后将字段输出到目标端Elasticsearch集群中。LogMessage: |1390000****|jop|byORP|2022-04-18T14:18:16.633|/log/cms/send|200|pluginNums=0,pluginStatus=0||||||
操作步骤
- 进入 阿里云Elasticsearch控制台的Logstash页面 。
-
进入目标实例。
- 在顶部菜单栏处,选择地域。
- 在 Logstash实例 中单击目标实例ID。
- 在左侧导航栏,单击 管道管理 。
- 单击 创建管道 。
-
在
创建管道任务
页面,输入管道ID并配置管道。
本文使用的管道配置如下。
input { beats { port => 8001 filter { mutate { gsub => ["message","\|","| "] split => ["message","|"] add_field => { "mobile" => "%{[message][1]}" "appName" => "%{[message][2]}" "type" => "%{[message][3]}" "timestamp" => "%{[message][4]}" "status" => "%{[message][5]}" "code" => "%{[message][6]}" "component" => "%{[message][7]}" "cid" => "%{[message][8]}" "serviceId" => "%{[message][9]}" "serviceName" => "%{[message][10]}" "serviceType" => "%{[message][11]}" "param" => "%{[message][12]}" mutate { strip => ["mobile","appName","type","timestamp","status","code","component","cid","serviceId","serviceName","serviceType","param"] output { elasticsearch { index => "<yourIndexName>" hosts => ["es-cn-7mz2mu1zp0006****.elasticsearch.aliyuncs.com:9200"] user => "elastic" password => "<yourPassword>" }
重要- input.beat.port 为Beats采集日志输入到当前Logstash管道的端口,需要使用8000~9000范围内的端口。
-
以上管道配置中的
gsub => ["message","\|","| "]
,第二个|
后有一个空格。 - 以上管道配置中的 index 、 hosts 和 password 参数值需要替换为您实际业务的索引名称、Elasticsearch集群的访问地址和集群的elastic账号对应的密码。
以上管道配置的原理说明如下:-
通过Logstash的
filter.mutate.gsub
参数,使用正则表达式
\|
去匹配 LogMessage 中的|
,将|
替换为|
(即|
+空格)。替换后的效果如下。LogMessage: | 1390000****| jop| byORP| 2022-04-18T14:18:16.633| /log/cms/send| 200| pluginNums=0,pluginStatus=0| | | | | |
-
通过
filter.mutate.split
参数将
LogMessage
按照
|
进行切分。 -
通过
filter.mutate.add_field
参数添加字段,即将切分后的
LogMessage
一一添加到对应的字段中。添加后的效果如下。
"mobile":" 1390000****", "appName":" jop", "type":" byORP", "timestamp":" 2022-04-18T14:18:16.633", "status":" /log/cms/sen", "code":" 200", "component":" pluginNums=0,pluginStatus=0", "cid":" ", "serviceId":" ", "serviceName":" ", "serviceType":" ", "param":" "
- 通过 filter.mutate.strip 参数去除字段空格。由于添加后的每个字段前面都有一个空格,因此需要去除这些空格。
更多管道配置说明,请参见 通过配置文件管理管道 和 Logstash配置文件说明 。
警告 配置完成后,需要保存并部署才能生效。保存并部署操作会触发实例重启,请在不影响业务的前提下,继续执行以下步骤。 -
单击
保存
或者
保存并部署
。
- 保存 :将管道信息保存在Logstash里并触发实例变更,配置不会生效。保存后,系统会返回 管道管理 页面。可在 管道列表 区域,单击 操作 列下的 立即部署 ,触发实例重启,使配置生效。
- 保存并部署 :保存并且部署后,会触发实例重启,使配置生效。
验证结果
-
登录目标阿里云Elasticsearch实例的Kibana控制台,根据页面提示进入Kibana主页。
登录Kibana控制台的具体操作,请参见 登录Kibana控制台 。说明 本文以阿里云Elasticsearch 7.10.0版本为例,其他版本操作可能略有差别,请以实际界面为准。
- 单击右上角的 Dev tools 。
-
在
Console
中,执行以下脚本,查询目标索引中的信息。
GET <yourIndexName>/_search "query": { "match_all": {} }
说明 <yourIndexName> 需要与管道配置中的 index 参数值保持一致。预期结果如下。
{ "took" : 1, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 "hits" : { "total" : { "value" : 1, "relation" : "eq" "max_score" : 1.0, "hits" : [ "_index" : "<yourIndexName>", "_type" : "_doc", "_id" : "Lb1UWoAB-6Zo6en4luDi", "_score" : 1.0, "_source" : { "mobile" : "1390000****", "appName" : "jop", "type" : "byORP", "timestamp" : "2022-04-18T14:18:16.633", "status" : "/log/cms/sen", "code" : "200", "component" : "pluginNums=0,pluginStatus=0", "cid" : "", "serviceId" : "", "serviceName" : "", "serviceType" : "", "param" : ""