从零开始:使用 OpenTelemetry Collector 构建强大的日志处理流水线

从零开始:使用 OpenTelemetry Collector 构建强大的日志处理流水线

上篇文章 中,我们通过基于 Kubernetes 注解的 OpenTelemetry 动态发现为应用添加了日志采集的能力。从 OpenTelemetry Collector 日志我们可以看到为两个示例应用创建了 FileLog 接收器 来采集日志。

在本文中,我们将深入探索 OpenTelemetry Collector 的 FileLog 日志接收器的使用。

前文回顾

首先回看下上篇文章的结尾,Otel 的 Kubernetes Observer 发现了带有指定注解或者符合接收器创建规则的 Pod,自动为其创建了 FileLog 接收器。

我们将日志中 Reciver Creator 通过内置配置模板创建的接收器,转换为 OpenTelemetry Collector 配置文件:

config:
  receivers:
    filelog:
      include:
        - "/var/log/pods/default_java-sample-77b6d8f9c5-5zzh5_e6c5ca2a-6c3f-4ae4-89a6-34ca5cd9a3fa/java-sample/*.log"
      include_file_name: false
      include_file_path: true
      operators:
        - id: container-parser
          type: container

在这个配置中,我们指定了 filelog 接收器的配置:

  • include:指定了要采集的日志文件,这里是 java-sample 应用的日志文件。路径 /var/log/pods/<namespace>_<pod_name>_<pod_uid>/<restart_count>.log 是 Kubernetes 默认的日志路径。
  • include_file_name:是否包含文件名,默认为 true
  • include_file_path:是否包含文件路径,默认为 false
  • operators:日志处理流水线,可以使用多个处理器对日志进行流式处理。这里使用了类型为 container 处理器,用于解析容器日志。

除了上面的几个配置外,FileLog 接收器还支持 更多的配置,方便对日志采集进行灵活的定制。

Container Opeartor 是用于容器日志的处理器。它支持以下功能:

  • DockerContaineredCRI-O 日志格式的解析。
  • 从日志文件路径中提取容器元数据,比如容器名、容器 ID、命名空间、Pod 名等。
  • 支持多行日志的合并。

简单的几行配置就可以实现容器日志的采集。

Container Operator 是在 v0.101.0 版本中引入的,其大大简化了日志采集的配置。而在之前,用于容器日志采集的配置非常复杂的。

正好通过这个例子深入探索一下 FileLog 的核心特性 - 日志处理流水线

日志处理流水线

在 FileLog 接收器中,流水线是一系列 处理器 Operator 的组合。每个处理器都可以执行简单的操作,比如解析、过滤、转换、聚合等。所以流水线,就是将这些处理器按照一定的顺序组合在一起,形成一个处理链,是日志从读取到输出的整个处理过程。

每个处理器:

  • 有一个唯一的 ID。
  • 有一个类型,用于标识处理器的功能。
  • 可以有配置,用于指定处理器的行为。
  • 从上一个处理器接收日志,处理后输出到下一个处理器。
  • 处理器的处理顺序是按照定义的顺序从上到下执行,但可以通过 out 字段指定输出到下一个处理器。

从日志文件中找出上图中展示的那条日志。

2025-02-15T14:14:00.811085588Z stdout F 2025-02-15 14:14:00.810  INFO 1 --- [nio-8080-exec-9] c.a.d.s.SpringBootRestApplication        : Hello World

这是一条来自 Containerd 容器的日志,包含了如下几个部分:

  • 时间戳:2025-02-15T14:14:00.811085588Z,这是容器运行时记录日志的时间。
  • 日志来源:stdout,表示日志是从容器的标准输出(Standard Output)流中捕获的。
  • 日志格式类型:F,这里是日志格式类型。F(Full)表示完整的日志记录;P(Partial)表示日志是分段记录的一部分,通常用于处理超长日志行。。
  • 应用日志:这部分是应用程序本身记录的日志,包含了时间、日志级别、线程信息、类名、日志消息等。

让我们通过 FileLog 接收器的配置来解析这条日志,这套配置将实现 Container Operator 的全部功能。

配置示例

基础配置

首先在 Otel Operator 的配置中添加 FileLog 接收器的基础配置。

config:
  receivers:
    filelog:
      include:
      - "/var/log/pods/*/*/*.log"
      exclude:
      - "/var/log/pods/otel-collector*/*/*.log"
      start_at: end
      poll_interval: 300ms
      encoding: utf-8
      include_file_name: false
      include_file_path: true
      preserve_trailing_whitespaces: true
      preserve_leading_whitespaces: false
      operators:
        - 

其中包含了 FileLog 接收器的基础配置:

  • include:指定了要采集的日志文件,这里是所有容器的日志文件。
  • exclude:排除了一些不需要采集的日志文件,比如 Otel Collector 的日志文件。
  • start_at:指定了日志采集的起始位置,这里是从日志文件的末尾开始采集。
  • poll_interval:指定了日志文件的轮询间隔,这里是 300ms。
  • encoding:指定了日志文件的编码格式,这里是 UTF-8。
  • include_file_name:是否包含文件名,默认为 true
  • include_file_path:是否包含文件路径,默认为 false
  • preserve_trailing_whitespaces:是否保留行尾空格,默认为 true
  • preserve_leading_whitespaces:是否保留行首空格,默认为 false
  • operators:日志处理流水线。

接下来就是配置的核心,日志处理流水线。

流水线:容器日志解析

第一个处理器 parser-containerd,用于解析 Containerd 容器的日志。如果需要解析其他容器的日志,可以添加多个同类型的处理器,使用匹配的正则表达式。

- id: parser-containerd
  type: regex_parser
  regex: ^(?P<time>\S+Z)\s(?P<stream>stdout|stderr)\s(?P<logtag>[^ ]*)\s?(?P<log>.*)
  output: containerd-recombine

这里使用了 regex_parser 类型的处理器,用于解析正则表达式匹配的日志。该表达式将日志分为时间 time、日志来源 stream、日志格式类型 logtag、日志内容 log 等几个部分。

解析结果:

{
  "time": "2025-02-15T14:14:00.811085588Z",
  "stream": "stdout",
  "logtag": "F",
  "log": "2025-02-15 14:14:00.810  INFO 1 --- [nio-8080-exec-9] c.a.d.s.SpringBootRestApplication        : Hello World",
  "log.file.path"": "/var/log/pods/default_java-sample-77b6d8f9c5-5zzh5_e6c5ca2a-6c3f-4ae4-89a6-34ca5cd9a3fa/java-sample/0.log"
}

解析后的日志,输出到下一个处理器 containerd-recombine

流水线:多行日志合并

第二个处理器 containerd-recombine,用于合并多行日志。通过日志格式类型 logtag 是否为 F 来判断是否是多行日志的最后一行。

- id: containerd-recombine
  type: recombine
  combine_field: attributes.log
  combine_with: ""
  source_identifier: attributes["log.file.path"]
  is_last_entry: attributes.logtag == 'F'
  max_log_size: 0
  output: extract_metadata_from_filepath

这里使用了 recombine 类型的处理器,用于合并多行日志。这里的配置表示:

  • combine_field:合并后的日志字段名,还是 log
  • combine_with:合并多行日志的分隔符,这里为空。
  • source_identifier:合并多行日志的标识符,这里是日志文件路径。
  • is_last_entry:是否是多行日志的最后一行,这里是判断日志格式类型是否为 F
  • max_log_size:最大日志大小,这里是 0,表示不限制日志大小。
  • output:输出到下一个处理器 extract_metadata_from_filepath

流水线:提取日志元数据

第三个处理器 extract_metadata_from_filepath,用于提取日志文件路径中的元数据。

- id: extract_metadata_from_filepath
  type: regex_parser
  parse_from: attributes["log.file.path"]
  regex: ^.*\/(?P<namespace>[^_]+)_(?P<pod_name>[^_]+)_(?P<uid>[a-f0-9\-]+)\/(?P<container_name>[^\._]+)\/(?P<restart_count>\d+)\.log$

这里又是 regex_parser 类型的处理器,用于解析日志文件路径。这里的正则表达式将日志文件路径解析为命名空间 namespace、Pod 名 pod_name、Pod UID uid、容器名 container_name、重启次数 restart_count 等元数据。

示例用的日志文件路径 /var/log/pods/default_java-sample-77b6d8f9c5-5zzh5_e6c5ca2a-6c3f-4ae4-89a6-34ca5cd9a3fa/java-sample/0.log,正则的匹配结果。

提取到的元数据被合并到原来的日志结构中:

{
  "time": "2025-02-15T14:14:00.811085588Z",
  "stream": "stdout",
  "logtag": "F",
  "log": "2025-02-15 14:14:00.810  INFO 1 --- [nio-8080-exec-9] c.a.d.s.SpringBootRestApplication        : Hello World",
  "log.file.path"": "/var/log/pods/default_java-sample-77b6d8f9c5-5zzh5_e6c5ca2a-6c3f-4ae4-89a6-34ca5cd9a3fa/java-sample/0.log",
  "namespace": "default",
  "pod_name": "java-sample",
  "uid": "77b6d8f9c5-5zzh5",
  "container_name": "java-sample",
  "restart_count": "0"
}

这样,我们就完成了对 Containerd 容器日志的解析和元数据提取。如果我们要将日志输送给 Loki 来存储,还需要对日志做进一步转换成 Loki 的格式:将元数据转换成 Loki 的标签。

因此,我们还需要添加一些处理器,将元数据转换成 Loki 的标签。

流水线:元数据转换

数据的转换需要用到 move 处理器,它可以将一个字段的值移动到另一个字段。

- type: move
  from: attributes.namespace
  to: resource["k8s.namespace.name"]
- type: move
  from: attributes.pod_name
  to: resource["k8s.pod.name"]
- type: move
  from: attributes.uid
  to: resource["k8s.pod.uid"]
- type: move
  from: attributes.container_name
  to: resource["k8s.container.name"]
- type: move
  from: attributes.restart_count
  to: resource["k8s.container.restartCount"]

这里将命名空间 namespace、Pod 名 pod_name、Pod UID uid、容器名 container_name、重启次数 restart_count 等元数据转换成 Loki 的 resource 标签。

根据应用程序日志格式的不同,可能还需要多额外的处理。

流水线:堆栈日志合并

一些应用程序输出多个日志线,实际上是单个日志记录。一个常见的例子就是 Java 堆栈跟踪:

java.lang.Exception: Stack trace
        at java.lang.Thread.dumpStack(Thread.java:1336)
        at Main.demo3(Main.java:15)
        at Main.demo2(Main.java:12)
        at Main.demo1(Main.java:9)
        at Main.demo(Main.java:6)
        at Main.main(Main.java:3)

recombine 处理器又派上用场了。

- type: recombine
  combine_field: body
  is_first_entry: body matches "^[^\\s]"
  source_identifier: attributes["log.file.path"]

这里的配置表示:

  • combine_field:合并后的日志字段名,还是 body
  • is_first_entry:是否是多行日志的第一行,这里是判断日志是否以非空字符开头。
  • source_identifier:合并多行日志的标识符,这里是日志文件路径。

这样,我们就完成了对容器日志的解析、元数据提取和 Loki 标签转换等一系列操作。

完整配置

最终的完整版配置如下:

config:
  receivers:
    filelog:
      include:
      - "/var/log/pods/*/*/*.log"
      exclude:
      - "/var/log/pods/otel-collector*/*/*.log"
      - "/var/log/pods/loki*/*/*.log"
      start_at: end
      poll_interval: 300ms
      encoding: utf-8
      include_file_name: false
      include_file_path: true
      preserve_trailing_whitespaces: true
      preserve_leading_whitespaces: false
      operators:
        - id: parser-containerd
          type: regex_parser
          regex: ^(?P<time>\S+Z)\s(?P<stream>stdout|stderr)\s(?P<logtag>[^ ]*)\s?(?P<log>.*)
          output: containerd-recombine
        - id: containerd-recombine
          type: recombine
          combine_field: attributes.log
          combine_with: ""
          source_identifier: attributes["log.file.path"]
          is_last_entry: attributes.logtag == 'F'
          max_log_size: 0
          output: extract_metadata_from_filepath
        - id: extract_metadata_from_filepath
          type: regex_parser
          parse_from: attributes["log.file.path"]
          regex: ^.*\/(?P<namespace>[^_]+)_(?P<pod_name>[^_]+)_(?P<uid>[a-f0-9\-]+)\/(?P<container_name>[^\._]+)\/(?P<restart_count>\d+)\.log$
        - type: move
          from: attributes.namespace
          to: resource["k8s.namespace.name"]
        - type: move
          from: attributes.pod_name
          to: resource["k8s.pod.name"]
        - type: move
          from: attributes.uid
          to: resource["k8s.pod.uid"]
        - type: move
          from: attributes.container_name
          to: resource["k8s.container.name"]
        - type: move
          from: attributes.restart_count
          to: resource["k8s.container.restartCount"]
        - type: recombine
          combine_field: body
          is_first_entry: body matches "^[^\\s]"
          source_identifier: attributes["log.file.path"]

实验过程不再赘述,有兴趣的小伙伴可以参考 上篇文章 搭建一套 OpenTelemetry + Loki + Grafana 的环境,然后将上面的配置应用到 Otel Collector。

总结

在本文中,我们深入探索了 OpenTelemetry Collector 的 FileLog 日志接收器的使用。通过配置日志处理流水线,我们可以对容器日志进行解析、元数据提取和 Loki 标签转换等一系列操作,实现了 Container Opeartor 的全部功能。

在掌握了 FileLog 接收器的使用后,我们可以更加灵活地对容器日志进行采集和处理,还能采集和处理如系统日志等其他类型的日志。

(转载本站文章请注明作者和出处乱世浮生,请勿用于任何商业用途)

comments powered by Disqus