配置Kafka output
kafka output 发送事件到apache kafka
https://www.elastic.co/guide/en/beats/filebeat/current/kafka-output.html
Example configuration
output.kafka:
# initial brokers for reading cluster metadata
hosts: ["kafka1:9092", "kafka2:9092", "kafka3:9092"]
# message topic selection + partitioning
topic: '%{[fields.log_topic]}'
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
大于max_message_bytes的事件将被丢弃,确保Filebeat发出的事件小于max_message_bytes
kafka outpu可以工作与kafka 0.11 和 2.2.2之间的所有版本。旧版本或许也能执行,但不被支持。
enable
enable是一个布尔值,可以控制output的启用和停用。
默认值:true
列表,获取集群元数据的kafka borker地址。
version
假设filebeat所输出到的对应的kafka的版本
默认:1.0.0
username
连接kafka的用户名,如果设置了用户名就必须设置password
password
连接kafka的密码
sasl.mechanism
连接到 Kafka 时使用的 SASL 机制。处于测试阶段。
topic
用于生产事件的topic。
可以通过使用格式字符串访问任何事件字段来动态设置主题。例如,此配置使用自定义字段 fields.log_topic为每个事件设置主题:
topic: '%{[fields.log_topic]}'
如何添加事件的自定义fields:
https://www.elastic.co/guide/en/beats/filebeat/current/configuration-general-options.html#libbeat-configuration-fields
topics
一组topic选择器规则。每个规则指定topic用于匹配规则的events。Filebeat根据数组中的第一个匹配规则为每个事件设置topic 。规则可以包含conditionals、基于字符串格式的fields和name mapping。如果topics字段没有配置或者没有规则匹配上,则会使用topic字段
规则设置:
topic
要使用的topic的字符串格式,如果字符串包含字段引用。如:%{[fields.name]},那么引用的字段必须存在,否则规则失效。
mappings
字典,获取topic返回值,将其映射到新的名称
default
mappings未找到匹配项时,使用默认字符串的值
条件,必须成功才能执行当前规则。when支持的条件:
https://www.elastic.co/guide/en/beats/filebeat/current/defining-processors.html#conditions
以下示例根据消息字段是否包含指定字符串来设置topic:
output.kafka:
hosts: ["localhost:9092"]
topic: "logs-%{[agent.version]}"
topics:
- topic: "critical-%{[agent.version]}"
when.contains:
message: "CRITICAL"
- topic: "error-%{[agent.version]}"
when.contains:
message: "ERR"
这个配置规则的 topics 名称将会有 critical-7.13.3, error-7.13.3, 和 logs-7.13.3。
partition
client_id
用于日志记录、调试和审计目的的可配置 ClientID。默认为“beats”。
worker
负载平衡的 Kafka output工作进程的并发线程的数量。
codec
输出编解码器配置。如果codec缺少该部分,则事件将被 json 编码。
https://www.elastic.co/guide/en/beats/filebeat/current/configuration-output-codec.html
Kafka 元数据更新设置。元数据确实包含有关用于发布的brokers, topics, partition和活动领导者的信息。
refresh_frequency
元数据刷新间隔,默认十分钟。
获取元数据时使用的策略,当此选项为 时true,客户端将维护所有可用topic的完整元数据集,如果此选项设置为false它只会刷新配置topic的元数据。默认值为false。
retry.max
当集群处于领导者选举中间时,元数据更新重试的总数。默认值为 3。
retry.backoff
领导选举期间重试之间的等待时间。默认值为 250 毫秒。
max_retries
Filebeat 会忽略该max_retries设置,并无限期重试。
bulk_max_size
在单个 Kafka 请求中批量处理的最大事件数。默认值为 2048。
bulk_flush_frequency
发送批量 Kafka 请求之前等待的时间。0 是没有延迟。默认值为 0。
timeout
超时前等待 Kafka 代理响应的秒数。默认值为 30(秒)。
broket_timeout
代理等待所需 ACK 数量的最长时间。默认为 10 秒。
channel_buffer_size
每个 Kafka 代理在输出管道中缓冲的消息数。默认值为 256。
keep_alive
活动网络连接的保持活动期。如果为 0,则保持连接被禁用。默认值为 0 秒。
compression
设置输出压缩编解码器。必须是none,snappy,lz4和gzip其中一个。默认为gzip。
compression_level
设置 gzip 使用的压缩级别。将此值设置为 0 将禁用压缩。压缩级别必须在 1(最佳速度)到 9(最佳压缩)的范围内。
提高压缩级别会降低网络使用率,但会增加 CPU 使用率。
默认值为 4。
mas_message_bytes
JSON 编码消息的最大允许大小。较大的消息将被丢弃。默认值为 1000000(字节)。该值应等于或小于borker的message.max.bytes。
required_acks
borker要求的 ACK 可靠性级别。0=无响应,1=等待本地提交,-1=等待所有副本提交。默认值为 1。
注意:如果设置为 0,则 Kafka 不会返回任何 ACK。消息可能会在出错时静默丢失。
enable_krb5_fast
启用 Kerberos FAST 身份验证。这可能与某些 Active Directory 安装冲突。它与标准 Kerberos 设置不同,因为此标志仅适用于 Kafka 输出。默认为false。
Warning:此功能处于测试阶段,可能会发生变化。
SSL 参数的配置选项。
kerberos
Kerberos 身份验证的配置选项。
Warning:此功能处于测试阶段,可能会发生变化。
目录配置Kafka outputExample configuration兼容性配置选项enablehostversionusernamepasswordsasl.mechanismtopictopicskeypartitionclient_idworkercodecmetadatamax_retriesbulk_max_sizebulk_flush_frequencytimeoutbroket_timeout
它是完全免费和完全开源的。 许可证是 Apache 2.0,这意味着您可以随意以任何方式使用它。
Kafka
输出插件已移动
这个
Kafka
输出插件现在是的一部分。 在可能的情况下,该项目仍对该项目的修复向后移植到 8.x 系列保持开放,但应首先在上提交问题。
Logstash 提供了基础设施来自动为这个插件生成文档。 我们使用asciidoc格式编写文档,因此源代码中的所有注释都将首先转换为asciidoc,然后转换为html。 所有插件文档都放在一个。
对于格式化代码或
配置
示例,您可以使用 asciidoc [source,ruby]指令
有关更多 asciidoc 格式提示,请参阅此处的优秀参考
需要帮忙?
需要帮忙? 在 freenode IRC 或论坛上尝试 #logstash。
一、插件开发与测试
首先,您需要安
#-------------------------------
Kafka
output
----------------------------------
output
.
kafka
:
# 是否启用
enabled: true
# The list ...
Kafka
设计的初衷是迅速处理短小的消息,一般10K大小的消息吞吐性能最好(可参见LinkedIn的
kafka
性能测试)。但有时候,我们需要处理更大的消息,比如XML文档或JSON内容,一个消息差不多有10-100M,这种情况下,Kakfa应该如何处理?
针对这个问题,有以下几个建议:
最好的方法是不直接传送这些大的数据。如果有共享存储,如NAS, HDFS, S3等,可以把这些大的文件存放到共享存储,然后使用
Kafka
来传送文件的位置信息。
第二个方法是,将大的消息数据切片或切块,在生产端将数..
[2020-08-19T10:54:34,504][ERROR][logstash.
output
s.
kafka
] Unable to create
Kafka
producer from given configuration
{:
kafka
_error_message=>org.apache.
kafka
.common.config.ConfigException: Invalid value 32768 for
图片来源:
https://www.elastic.co/guide/en/beats/
filebeat
/current/
filebeat
-overview.html
通过
filebeat
配置
文件
filebeat
.yml 指定需要收集的日志,并指定输出至 elasticsearch,logstash,
kafka
,redis 等。
本文讲述如何
配置
filebeat
,将日志输
你想了解关于
Filebeat
和SASL
Kafka
的信息吗?
Filebeat
是一个开源的日志数据收集器,它可以轻松地将日志数据发送到不同的目的地。而SASL是指Simple Authentication and Security Layer,它是一种用于网络通信的认证和安全协议。
Kafka
是一个分布式流处理平台,它允许高吞吐量的实时数据传输。
如果你想使用
Filebeat
将日志数据发送到SASL认证的
Kafka
集群,你需要进行一些
配置
。首先,你需要在
Filebeat
的
配置
文件中设置
Kafka
输出,指定
Kafka
集群的地址和端口。然后,你需要
配置
SASL认证相关的
参数
,如认证机制、用户名和密码。这些
参数
可以根据你的
Kafka
集群的
配置
而有所不同。
一旦你完成了
配置
,启动
Filebeat
后,它将会开始收集并发送日志数据到SASL认证的
Kafka
集群。
希望这可以帮助到你!如有更多问题,请继续提问。
阿里云-云盘修改UUID
夜神-月:
kops-安装k8s集群自动化脚本示例
_Cooki_: