基于ELK的分布式日志与数据同步实践
ELK架构核心组件解析
ELK技术栈由三个关键开源工具构成,共同实现日志的采集、存储、分析与可视化:
- Logstash:作为数据管道,负责从多种来源收集、过滤并转发日志或业务数据。
- Elasticsearch:提供分布式的全文搜索和数据分析能力,支撑海量数据的实时查询。
- Kibana:构建于Elasticsearch之上,提供直观的数据仪表盘和交互式探索界面。
使用Logstash采集系统日志
在完成Elasticsearch(9200端口)与Kibana(5601端口)容器部署后,可进一步搭建Logstash服务以集中管理应用日志。
部署独立Logstash实例
- 创建配置目录:
mkdir -p /opt/logstash/conf - 编写运行参数文件
logstash.yml:
http.host: "0.0.0.0"
http.port: 4560
xpack.monitoring.elasticsearch.hosts: ["http://192.168.40.77:9200"]
- 定义输入输出逻辑的管道配置
logstash.conf:
input {
tcp {
mode => "server"
host => "0.0.0.0"
port => 5000
codec => "json_lines"
}
}
output {
elasticsearch {
hosts => ["http://192.168.40.77:9200"]
index => "app-logs-%{+yyyy.MM.dd}"
}
}
- 赋予读写权限:
chmod -R 777 /opt/logstash - 启动Docker容器:
docker run -d --name log-collector --network my-net \
-p 5000:5000 -p 4560:4560 \
-v /opt/logstash/conf/logstash.yml:/usr/share/logstash/config/logstash.yml \
-v /opt/logstash/conf/logstash.conf:/usr/share/logstash/pipeline/logstash.conf \
registry.cn-hangzhou.aliyuncs.com/joezhou/logstash:8.4.0
- 开放防火墙端口:
firewall-cmd --add-port=5000/tcp --permanent
firewall-cmd --add-port=4560/tcp --permanent
firewall-cmd --reload
Spring Boot集成日志推送
在Java项目中引入依赖:
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>7.2</version>
</dependency>
更新 logback.xml 添加远程输出节点:
<appender name="LOGSTASH" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
<destination>192.168.40.77:5000</destination>
<encoder class="net.logstash.logback.encoder.LogstashEncoder">
<customFields>{"application":"service-a"}</customFields>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="CONSOLE"/>
<appender-ref ref="LOGSTASH"/>
</root>
随后可在Kibana中创建索引模式 app-logs-* 并选择 @timestamp 字段进行日志浏览。
通过Logstash实现MySQL到ES的数据同步
为避免对主数据库造成压力,建议连接至MySQL集群中的只读副本执行数据抽取任务。
全量数据导入配置
适用于首次初始化或周期性整体刷新场景。
input {
jdbc {
jdbc_driver_library => "/data/mysql-connector-java-8.0.16.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://192.168.40.77:3306/es"
jdbc_user => "root"
jdbc_password => "root"
statement => "SELECT * FROM dept ORDER BY updated ASC"
schedule => "* * * * *"
jdbc_paging_enabled => true
jdbc_page_size => 500
}
}
filter {
mutate { remove_field => ["@version"] }
}
output {
elasticsearch {
hosts => ["http://192.168.40.77:9200"]
index => "dept"
document_id => "%{deptno}"
}
}
增量数据同步策略
利用时间戳字段追踪变更,仅拉取新增或修改记录。
input {
jdbc {
...
statement => "SELECT * FROM emp WHERE updated > :sql_last_value ORDER BY updated ASC"
use_column_value => true
tracking_column => "updated"
tracking_column_type => "timestamp"
record_last_run => true
last_run_metadata_path => "/data/emp_last_run.txt"
clean_run => false
}
}
output {
elasticsearch {
hosts => ["http://192.168.40.77:9200"]
index => "emp"
document_id => "%{empno}"
}
}
注意:物理删除操作不会被自动捕获,需结合逻辑删除标志位处理。
Spring Data Elasticsearch高级用法
借助Spring生态提供的抽象层简化ES操作。
实体映射与分词设置
重建索引以启用中文IK分词器:
PUT /dept
{
"mappings": {
"properties": {
"dname": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
}
}
}
}
自定义类型转换器
解决Java 8时间类型与ES格式不兼容问题:
public class LocalDateTimeConverter implements PropertyValueConverter {
private static final DateTimeFormatter FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
@Override
public Object write(Object value) {
if (value instanceof LocalDateTime dt) {
return dt.format(FORMATTER);
}
throw new IllegalArgumentException("Invalid type");
}
@Override
public Object read(Object value) {
if (value instanceof String str) {
return LocalDateTime.parse(str, FORMATTER);
}
throw new IllegalArgumentException("Invalid format");
}
}
文档实体类定义
@Data
@NoArgsConstructor
@AllArgsConstructor
@Document(indexName = "dept")
public class DepartmentDoc {
@Id
private Long deptno;
@Field(type = FieldType.Text, analyzer = "ik_max_word", searchAnalyzer = "ik_smart")
private String dname;
@Field(type = FieldType.Keyword)
private String loc;
@Field(type = FieldType.Date, format = DateFormat.custom, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
@ValueConverter(LocalDateTimeConverter.class)
private LocalDateTime updated;
}
仓库接口与常用方法
public interface DepartmentRepository extends ElasticsearchRepository<DepartmentDoc, Long> {
List<DepartmentDoc> findByDnameContaining(String keyword);
Page<DepartmentDoc> findByDnameContainingOrderByDeptnoDesc(String keyword, Pageable pageable);
}
典型测试用例:
@Test
void testQueryWithPagination() {
PageRequest page = PageRequest.of(0, 5);
Page<DepartmentDoc> result = repo.findByDnameContainingOrderByDeptnoDesc("研发", page);
System.out.println("Total pages: " + result.getTotalPages());
result.getContent().forEach(System.out::println);
}