Import Vector Data
This document was translated by ChatGPT
#1. Data Flow
#2. Configure Vector
#2.1 Install Vector
You can learn the relevant background knowledge in the Vector documentation (opens new window).
If your cluster does not have Vector, you can deploy it using the following steps:
## Add Vector repo
bash -c "$(curl -L https://setup.vector.dev)"
apt-get install vector -y
vector --version
### config file /etc/vector/vector.yaml
systemctl start vector
systemctl enable vector
2
3
4
5
6
7
## Add Vector repo
bash -c "$(curl -L https://setup.vector.dev)"
yum install vector -y
vector --version
### config file /etc/vector/vector.yaml
systemctl start vector
systemctl enable vector
2
3
4
5
6
7
8
helm repo add vector https://helm.vector.dev
helm repo update
cat << EOF > vector-values-custom.yaml
role: Agent
# FIXME: add nodeSelector here for deepflow
#nodeSelector:
# allow/vector: "false"
# resources -- Set Vector resource requests and limits.
resources:
requests:
cpu: 200m
memory: 256Mi
limits:
cpu: 200m
memory: 256Mi
image:
repository: timberio/vector
tag: 0.37.1-distroless-libc
podLabels:
vector.dev/exclude: "true"
app: deepflow
# extraVolumes -- Additional Volumes to use with Vector Pods.
# extraVolumes:
# - name: opt-log
# hostPath:
# path: "/opt/log/"
# extraVolumeMounts -- Additional Volume to mount into Vector Containers.
# extraVolumeMounts:
# - name: opt-log
# mountPath: "/opt/log/"
# readOnly: true
customConfig:
## The configuration comes from https://vector.dev/docs/reference/configuration/global-options/#data_dir
data_dir: /vector-data-dir
api:
enabled: true
address: 127.0.0.1:8686
playground: false
sources:
kubernetes_logs:
type: kubernetes_logs
namespace_annotation_fields:
namespace_labels: ""
node_annotation_fields:
node_labels: ""
pod_annotation_fields:
pod_annotations: ""
pod_labels: ""
transforms:
remap_kubernetes_logs:
type: remap
inputs:
- kubernetes_logs
source: |-
# try to parse json
if is_string(.message) && is_json(string!(.message)) {
tags = parse_json(.message) ?? {}
.message = tags.message # FIXME: the log content key inside json
del(tags.message)
.json = tags
}
if !exists(.level) {
if exists(.json) {
.level = to_string!(.json.level)
del(.json.level)
} else {
# match log levels surround by ``[]`` or ``<>`` with ignore case
level_tags = parse_regex(.message, r'[\[\\\<](?<level>(?i)INFOR?(MATION)?|WARN(ING)?|DEBUG?|ERROR?|TRACE|FATAL|CRIT(ICAL)?)[\]\\\>]') ?? {}
if !exists(level_tags.level) {
# match log levels surround by whitespace, required uppercase strictly in case mismatching
level_tags = parse_regex(.message, r'[\s](?<level>INFOR?(MATION)?|WARN(ING)?|DEBUG?|ERROR?|TRACE|FATAL|CRIT(ICAL)?)[\s]') ?? {}
}
if exists(level_tags.level) {
level_tags.level = upcase(string!(level_tags.level))
.level = level_tags.level
}
}
}
if !exists(._df_log_type) {
# default log type
._df_log_type = "user"
}
if !exists(.app_service) {
.app_service = .kubernetes.container_name
}
sinks:
http:
encoding:
codec: json
inputs:
- remap_kubernetes_logs
type: http
uri: http://deepflow-agent.deepflow/api/v1/log
EOF
helm install vector vector/vector \
--namespace vector \
--create-namespace \
-f vector-values-custom.yaml
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
Before configuration, you can first learn about the Vector workflow (opens new window). Data flows in the following module order, from the source to the destination:
Generally, a Vector configuration contains at least the sources module and the sinks module. If additional data processing is required, you must add the transforms module to clean the data into the final desired content. A typical Vector configuration looks like this:
# Data sources
sources:
nginx_logs:
# ...
file_logs:
# ...
kubernetes_logs:
# ...
# Data processing
transforms:
tag_log:
# inputs refers to the data source; here you can configure the key from sources or from other transforms
inputs:
- nginx_logs
# ...
flush_log:
# tag_log comes from the previous transforms module, so the same data is processed sequentially by two transforms modules
inputs:
- tag_log
- file_logs
# ...
# Data output
sinks:
push_log:
# Similarly, inputs here can come from both sources and transforms modules
inputs:
- flush_log
- kubernetes_logs
# ...
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
In the above example, different configurations implement three different data flows:
Next, let's look at the specific configuration of each module.
#2.2 Collect Logs
After installing Vector, we can use the Kubernetes_Log (opens new window) module to obtain logs from Pods deployed in Kubernetes. Since DeepFlow has already learned the relevant Labels and Annotations of Pods in Kubernetes through the AutoTagging mechanism, you can remove this part of the content when sending log streams to reduce transmission volume. Example configuration:
sources:
kubernetes_logs:
type: kubernetes_logs
namespace_annotation_fields:
namespace_labels: ''
node_annotation_fields:
node_labels: ''
pod_annotation_fields:
pod_annotations: ''
pod_labels: ''
2
3
4
5
6
7
8
9
10
If you deploy Vector as a process on a cloud server, you can use the File (opens new window) module to obtain logs from a specified path. Using /var/log/ as an example:
sources:
files_logs:
type: file
include:
- /var/log/*.log
- /var/log/**/*.log
exclude:
# FIXME: If both kubernetes_logs and file modules are configured, remove k8s log folders to avoid duplicate monitoring
- /var/log/pods/**
- /var/log/containers/**
fingerprint:
strategy: 'device_and_inode'
2
3
4
5
6
7
8
9
10
11
12
#2.3 Inject Tags
We can then use the Remap (opens new window) module in Transforms to add necessary tags to the logs being sent. Currently, we require two tags: _df_log_type and level. Example configuration:
transforms:
remap_kubernetes_logs:
type: remap
inputs:
- kubernetes_logs
- files_logs
source: |-
# try to parse json
if is_string(.message) && is_json(string!(.message)) {
tags = parse_json(.message) ?? {}
.message = tags.message # FIXME: the log content key inside json
del(tags.message)
.json = tags
}
if !exists(.level) {
if exists(.json) {
.level = to_string!(.json.level)
del(.json.level)
} else {
# match log levels surround by `[]` or `<>` with ignore case
level_tags = parse_regex(.message, r'[\[\\<](?<level>(?i)INFOR?(MATION)?|WARN(ING)?|DEBUG?|ERROR?|TRACE|FATAL|CRIT(ICAL)?)[\]\\>]') ?? {}
if !exists(level_tags.level) {
# match log levels surround by whitespace, required uppercase strictly in case mismatching
level_tags = parse_regex(.message, r'[\s](?<level>INFOR?(MATION)?|WARN(ING)?|DEBUG?|ERROR?|TRACE|FATAL|CRIT(ICAL)?)[\s]') ?? {}
}
if exists(level_tags.level) {
level_tags.level = upcase(string!(level_tags.level))
.level = level_tags.level
}
}
}
if !exists(._df_log_type) {
# default log type
._df_log_type = "user"
}
if !exists(.app_service) {
# FIXME: files module does not have this field, please inject application name via log content
.app_service = .kubernetes.container_name
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
In this snippet, we assume that we may get both JSON-formatted logs and non-JSON logs. For both types, we try to extract the log level level. For JSON logs, we extract its content into the outer message field and put the remaining JSON keys into a field named json. At the end, we add _df_log_type=user and app_service=kubernetes.container_name tags to both types of logs.
If you have richer log formats to match in practice, refer to the Vrl (opens new window) syntax rules to customize your log extraction rules.
#2.4 Common Configurations
In addition to the above, the Transforms module can implement many features to help extract more accurate information from logs. Here are some common configurations:
#2.4.1 Merge Multi-line Logs
Recommendation: Use regex to match the "start pattern" of a log. Before encountering the next "start pattern", aggregate all logs into one message and keep line breaks. To reduce mismatches, match a datetime format like yyyy-MM-dd HH:mm:ss at the start of a log line.
transforms:
# The configuration comes from https://vector.dev/docs/reference/configuration/transforms/reduce/
multiline_kubernetes_logs:
type: reduce
inputs:
- kubernetes_logs
group_by:
- file
- stream
merge_strategies:
message: concat_newline
starts_when: match(string!(.message), r'^(\[|\[?\u001B\[[0-9;]*m|\{\".+\"|(::ffff:)?([0-9]{1,3}.){3}[0-9]{1,3}[\s\-]+(\[)?)?\d{4}[-\/\.]?\d{2}[-\/\.]?\d{2}[T\s]?\d{2}:\d{2}:\d{2}')
expire_after_ms: 2000 # unit: ms, aggregate logs max waiting timeout
flush_period_ms: 500 # unit: ms, flush expire events
2
3
4
5
6
7
8
9
10
11
12
13
14
#2.4.2 Filter Color Control Characters
Recommendation: Use regex to filter color control characters in logs to improve readability.
transforms:
# The configuration comes from https://vector.dev/docs/reference/configuration/transforms/remap/
flush_kubernetes_logs:
type: remap
inputs:
- multiline_kubernetes_logs
source: |-
.message = replace(string!(.message), r'\u001B\[([0-9]{1,3}(;[0-9]{1,3})*)?m', "")
2
3
4
5
6
7
8
#2.4.3 Extract Log Level
Recommendation: Use regex to try to match log levels in logs. To reduce mismatches, you can enclose log levels in symbols like [].
transforms:
# The configuration comes from https://vector.dev/docs/reference/configuration/transforms/remap/
remap_kubernetes_logs:
type: remap
inputs:
- flush_kubernetes_logs
source: |-
# match log levels surround by `[]` or `<>` with ignore case
level_tags = parse_regex(.message, r'[\[\\<](?<level>(?i)INFOR?(MATION)?|WARN(ING)?|DEBUG?|ERROR?|TRACE|FATAL|CRIT(ICAL)?)[\]\\>]') ?? {}
if !exists(level_tags.level) {
# match log levels surround by whitespace, required uppercase strictly in case mismatching
level_tags = parse_regex(.message, r'[\s](?<level>INFOR?(MATION)?|WARN(ING)?|DEBUG?|ERROR?|TRACE|FATAL|CRIT(ICAL)?)[\s]') ?? {}
}
if exists(level_tags.level) {
level_tags.level = upcase(string!(level_tags.level))
.level = level_tags.level
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#2.4.4 Extract Custom Tags
If your application needs to inject some custom tags for log filtering, you can also use the Remap module in Transforms to write code to inject tags. We require that custom tags must be written into the .json struct to be stored and queried. Example:
transforms:
# The configuration comes from https://vector.dev/docs/reference/configuration/transforms/remap/
inject_json_tags:
type: remap
inputs:
- remap_kubernetes_logs
source: |-
.json = {
"cluster": "Production",
"extra_user_tag": "xxxxx" # FIXME: customize the tags you need
}
2
3
4
5
6
7
8
9
10
11
Then, when using the SQL API to query, you can filter the injected tags with:
curl -XPOST "http://${deepflow_server_node_ip}:${port}/v1/query/" \
--data-urlencode "db=application_log" \
--data-urlencode "sql=select attribute.cluster, body from log where attribute.cluster='Production'"
2
3
#2.5 Send
Finally, we use the HTTP (opens new window) module to send logs to the DeepFlow Agent.
sinks:
http:
encoding:
codec: json
inputs:
- remap_kubernetes_logs # NOTE: The data source here is the key of the transform module
type: http
uri: http://deepflow-agent.deepflow/api/v1/log
2
3
4
5
6
7
8
By combining these three modules, you can collect logs, inject tags, and finally send them to DeepFlow.
#2.6 Complete Example
Based on the above, here is a complete example. Suppose the collection target is an nginx application deployed on a cloud server, you can collect its logs and send them to DeepFlow with the following configuration:
sources:
nginx_logs:
type: file
include:
- /var/log/nginx/*.log
fingerprint:
strategy: 'device_and_inode'
transforms:
tag_nginx_log:
type: remap
inputs:
- nginx_logs
source: |-
source = parse_regex(.file, r'\/var\/log\/(?<file_source>.+)\.log') ?? {}
source.file_source = replace!(source.file_source, "/", "-")
hostname, _ = get_hostname()
.app_service = hostname + "-" + source.file_source
.json = {
"cluster": "Production",
"module": "nginx"
}
sinks:
http:
encoding:
codec: json
inputs:
- tag_nginx_log
uri: http://${deepflow-agent-host}:${port}/api/v1/log # FIXME: Fill in the address of the target DeepFlow Agent that can receive data
type: http
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#3. Configure DeepFlow
To allow the DeepFlow Agent to receive this data, please refer to the Configure DeepFlow section to complete the DeepFlow Agent configuration.