通过logstash实现mysql与es的双向数据同步

news2024/11/18 7:45:37

参考题目

  1. 一种基于MySQL和Elasticsearch的数据同步方法及系统
  2. 基于MySQL和Elasticsearch的数据同步方法
  3. 一种基于MySQL和Elasticsearch的数据同步系统
  4. 基于MySQL和Elasticsearch的数据同步技术

目录

1【理论调研】

方案1:使用Logstash实现数据同步

方案2:使用Canal实现数据同步

方案3:使用Debezium实现数据同步

使用其他工具

2【使用Logstash实现MySQL和ES之间的双向数据同步】

2.0【MySQL测试数据库sql导入代码】

2.1【Logstash实现MySQL数据同步至ES】

2.2【Logstash实现ES数据同步至MySQL】

2.2.1【Bug记录】

2.2.2【参考文章】


1【理论调研】

实现MySQL和ES的双向数据同步,可以考虑使用以下几种解决方案:

实现MySQL和Elasticsearch(ES)之间的双向数据同步,需要使用一些工具和技术。以下是一些可能的方法:

方案1:使用Logstash实现数据同步

Logstash是一种流处理工具,可以从不同的来源获取数据并将其转换为指定格式输出到目标存储中,它支持从MySQL数据库读取数据,并将数据写入ES中,也可以从ES中读取数据并将数据写入MySQL数据库中。使用Logstash实现MySQL和ES的双向数据同步,可以按照以下步骤进行:

  1. 在MySQL和ES上安装Logstash;
  2. 配置MySQL和ES的连接信息,包括主机地址、端口、用户名、密码等;
  3. 配置Logstash的输入和输出插件,从MySQL中读取数据并写入ES中,同时从ES中读取数据并写入MySQL中;
  4. 启动Logstash并监控同步过程。

Logstash是一个流处理引擎,可以轻松地将数据从MySQL和ES之间传输。使用Logstash,您可以轻松地将MySQL表的数据导入到ES中,也可以将ES中的数据写回MySQL表中。您可以使用以下配置文件将数据从MySQL同步到ES:

input {
  jdbc {
    jdbc_connection_string => "jdbc:mysql://localhost:3306/mydatabase"
    jdbc_user => "myuser"
    jdbc_password => "mypassword"
    jdbc_driver_library => "/path/to/mysql-connector-java.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    statement => "SELECT * FROM mytable"
  }
}

output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "myindex"
    document_id => "%{id}"
  }
}

这将从MySQL的“mytable”表中选择所有行,并将它们写入名为“myindex”的ES索引中。

如果您想将ES中的数据写回MySQL表中,您可以使用类似以下的配置文件:

input {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "myindex"
    query => '{"query": {"match_all": {}}}'
    scroll => "5m"
    docinfo => true
  }
}

output {
  jdbc {
    jdbc_connection_string => "jdbc:mysql://localhost:3306/mydatabase"
    jdbc_user => "myuser"
    jdbc_password => "mypassword"
    jdbc_driver_library => "/path/to/mysql-connector-java.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    statement => "UPDATE mytable SET myfield = ? WHERE id = ?"
    prepared_statement_bind_values => ["%{myfield}", "%{[@metadata][_id]}"]
  }
}

这将从名为“myindex”的ES索引中选择所有文档,并将它们写回名为“mytable”的MySQL表中。

方案2:使用Canal实现数据同步

Canal是阿里巴巴开源的一款基于数据库增量日志解析,提供增量数据订阅和消费的组件,它支持从MySQL中读取增量数据,并将数据写入ES中,同时支持从ES中读取数据并将数据写入MySQL中。使用Canal实现MySQL和ES的双向数据同步,可以按照以下步骤进行:

  1. 在MySQL和ES上安装Canal;
  2. 配置MySQL和ES的连接信息,包括主机地址、端口、用户名、密码等;
  3. 配置Canal的实例,包括MySQL的binlog信息、ES的索引信息等;
  4. 启动Canal并监控同步过程。

需要注意的是,在使用Logstash或Canal进行数据同步时,可能会出现数据类型不匹配、数据格式错误、数据丢失等问题,需要根据具体情况进行调整和优化。同时,为了确保数据同步的实时性和准确性,可以考虑增加监控和告警机制。

方案3:使用Debezium实现数据同步

Debezium是一个开源的分布式平台,可在数据源和目标之间实现实时数据流。它支持MySQL和ES之间的数据同步,并支持双向同步。使用Debezium,您可以在MySQL和ES之间实时同步数据更改。您可以按照以下步骤使用Debezium进行双向数据同步:

  • 下载并安装Debezium
  • 配置Debezium以监视MySQL表的更改
  • 配置Debezium以将更改写入ES
  • 配置Debezium以监视ES的更改
  • 配置Debezium以将更改写回MySQL

使用其他工具

除了Logstash和Debezium之外,还有一些其他工具可用于MySQL和ES之间的数据同步。例如,您可以使用StreamSets Data Collector或Apache Nifi来将数据从MySQL导入到ES,并将数据从ES写回MySQL。您还可以编写自己的脚本来执行此操作。无论您选择哪种方法,确保您的同步逻辑能够处理。

2【使用Logstash实现MySQL和ES之间的双向数据同步】

软件版本:

  1. logstash -f ../config/newsManager/mysql2es.conf
  2. logstash -f ../config/newsManager/es2mysql.conf

2.0【MySQL测试数据库sql导入代码】

  1. MySQL数据库名称:news_manager
  2. MySQL数据库版本:5.5.40
/*
SQLyog Ultimate v12.09 (64 bit)
MySQL - 5.5.40 : Database - news_manager
*********************************************************************
*/


/*!40101 SET NAMES utf8 */;

/*!40101 SET SQL_MODE=''*/;

/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
CREATE DATABASE /*!32312 IF NOT EXISTS*/`news_manager` /*!40100 DEFAULT CHARACTER SET utf8 */;

USE `news_manager`;

/*Table structure for table `item_user` */

DROP TABLE IF EXISTS `item_user`;

CREATE TABLE `item_user` (
  `item_user_id` int(11) NOT NULL AUTO_INCREMENT,
  `user_id` int(11) DEFAULT NULL,
  `item_id` int(11) DEFAULT NULL,
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
  `status` int(11) DEFAULT '1' COMMENT '1:启用 0:禁用',
  PRIMARY KEY (`item_user_id`),
  KEY `FK_Reference_2` (`user_id`),
  KEY `FK_Reference_3` (`item_id`),
  CONSTRAINT `FK_Reference_2` FOREIGN KEY (`user_id`) REFERENCES `user_info` (`user_id`),
  CONSTRAINT `FK_Reference_3` FOREIGN KEY (`item_id`) REFERENCES `news_item` (`item_id`)
) ENGINE=InnoDB AUTO_INCREMENT=15 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_hungarian_ci;

/*Data for the table `item_user` */

insert  into `item_user`(`item_user_id`,`user_id`,`item_id`,`create_time`,`update_time`,`status`) values (1,1,2,'2020-11-23 11:24:16','2020-11-25 10:27:54',1),(2,2,4,NULL,'2020-11-25 09:38:17',1),(3,1,1,'2020-11-24 09:19:58','2020-11-25 09:38:21',1),(5,1,18,NULL,'2020-11-25 09:44:16',1),(6,1,27,'2020-11-25 11:11:35','2020-11-25 11:11:35',1),(7,1,28,'2020-11-25 11:17:59','2020-11-25 11:17:59',1),(8,1,29,'2020-11-25 11:29:14','2020-11-25 11:29:14',1),(9,1,30,'2020-11-25 11:30:54','2020-11-25 11:30:54',1),(10,1,31,'2020-11-25 11:36:51','2020-11-25 11:36:51',1),(11,1,32,'2020-11-25 16:26:23','2020-11-25 16:26:23',1),(12,1,33,'2020-11-25 16:26:37','2020-11-25 16:26:37',1),(13,1,34,'2020-11-26 10:01:29','2020-11-26 10:01:29',1),(14,1,35,'2020-11-26 10:28:53','2020-11-26 10:28:53',1);

/*Table structure for table `logs_info` */

DROP TABLE IF EXISTS `logs_info`;

CREATE TABLE `logs_info` (
  `logs_id` int(11) NOT NULL AUTO_INCREMENT,
  `user_id` int(11) DEFAULT NULL,
  `logs_content` text COLLATE utf8mb4_hungarian_ci,
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
  PRIMARY KEY (`logs_id`),
  KEY `FK_Reference_1` (`user_id`),
  CONSTRAINT `FK_Reference_1` FOREIGN KEY (`user_id`) REFERENCES `user_info` (`user_id`)
) ENGINE=InnoDB AUTO_INCREMENT=16 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_hungarian_ci;

/*Data for the table `logs_info` */

insert  into `logs_info`(`logs_id`,`user_id`,`logs_content`,`create_time`,`update_time`) values (1,1,NULL,NULL,'2020-11-24 09:27:05'),(2,2,NULL,NULL,'2020-11-24 09:27:12'),(3,4,NULL,NULL,'2020-11-23 11:29:06'),(14,1,'woshishenren','2020-11-24 09:24:52','2020-11-24 09:24:52'),(15,1,'woshishenren','2020-11-24 09:25:58','2020-11-24 09:25:58');

/*Table structure for table `news_info` */

DROP TABLE IF EXISTS `news_info`;

CREATE TABLE `news_info` (
  `new_id` int(11) NOT NULL AUTO_INCREMENT,
  `item_id` int(11) DEFAULT NULL,
  `news_title` varchar(255) COLLATE utf8mb4_hungarian_ci NOT NULL,
  `news_image` varchar(255) COLLATE utf8mb4_hungarian_ci DEFAULT NULL,
  `news_content` text COLLATE utf8mb4_hungarian_ci,
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
  PRIMARY KEY (`new_id`),
  KEY `FK_Reference_4` (`item_id`),
  CONSTRAINT `FK_Reference_4` FOREIGN KEY (`item_id`) REFERENCES `news_item` (`item_id`)
) ENGINE=InnoDB AUTO_INCREMENT=20 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_hungarian_ci;

/*Data for the table `news_info` */

insert  into `news_info`(`new_id`,`item_id`,`news_title`,`news_image`,`news_content`,`create_time`,`update_time`) values (1,2,'蓝桥杯比赛',NULL,NULL,NULL,'2020-11-23 09:27:39'),(2,3,'新学期学费',NULL,NULL,NULL,'2020-11-23 09:28:10'),(3,1,'拔河比赛',NULL,'拔河比赛要使劲!!!','2020-11-25 14:57:28','2020-11-25 14:57:32'),(4,18,'街舞比赛',NULL,'一起摇摆~','2020-11-25 15:54:09','2020-11-25 15:54:11'),(10,27,'数学建模',NULL,'一起加油!','2020-11-25 16:10:02','2020-11-25 22:17:19'),(11,29,'班班唱',NULL,'《走向复兴》','2020-11-25 16:12:23','2020-11-25 16:12:23'),(12,1,'篮球比赛',NULL,'冲冲冲~','2020-11-25 16:13:04','2020-11-25 16:13:04'),(13,1,'NECCS',NULL,'冲呀~','2020-11-25 16:27:22','2020-11-26 08:38:03'),(14,18,'卓见杯',NULL,'啦啦啦~','2020-11-25 17:41:32','2020-11-25 22:17:56'),(15,33,'动则升阳',NULL,'年轻不养生,年老养医生!','2020-11-26 00:12:42','2020-11-26 00:12:42'),(16,33,'11月26日',NULL,'筑基修士','2020-11-26 10:02:20','2020-11-26 10:02:20'),(17,35,'大家好',NULL,'333','2020-11-26 10:29:35','2020-11-26 10:29:35'),(18,35,'大家好!!!',NULL,'333','2020-11-26 10:29:45','2020-11-26 10:29:45'),(19,35,'我是新增数据!',NULL,'我是新增数据!','2023-03-15 16:43:01','2023-03-15 16:43:02');

/*Table structure for table `news_item` */

DROP TABLE IF EXISTS `news_item`;

CREATE TABLE `news_item` (
  `item_id` int(11) NOT NULL AUTO_INCREMENT,
  `item_name` varchar(255) COLLATE utf8mb4_hungarian_ci NOT NULL,
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
  `status` int(11) DEFAULT '1' COMMENT '1:启用 0:禁用',
  PRIMARY KEY (`item_id`)
) ENGINE=InnoDB AUTO_INCREMENT=36 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_hungarian_ci;

/*Data for the table `news_item` */

insert  into `news_item`(`item_id`,`item_name`,`create_time`,`update_time`,`status`) values (1,'呵呵哒','2020-11-24 15:47:00','2020-11-26 10:28:39',1),(2,'党支部','2020-11-24 15:47:03','2020-11-25 14:44:31',0),(3,'分团委','2020-11-24 15:47:05','2020-11-25 14:43:54',1),(4,'院团委','2020-11-24 15:47:08','2020-11-25 14:44:38',1),(5,'111','2020-11-23 15:22:54','2020-11-25 14:45:55',1),(6,'学生会','2020-11-24 09:27:36','2020-11-25 14:46:01',1),(8,'党支部','2020-11-24 13:51:13','2020-11-25 14:46:07',1),(18,'党支部','2020-11-25 09:11:51','2020-11-25 15:49:06',1),(19,'院团委','2020-11-25 10:42:54','2020-11-25 14:46:16',1),(20,'111','2020-11-25 10:54:12','2020-11-25 14:46:19',1),(21,'学生会','2020-11-25 10:56:21','2020-11-25 14:46:35',1),(22,'党支部','2020-11-25 10:57:35','2020-11-25 14:46:43',1),(23,'分团委','2020-11-25 11:00:20','2020-11-25 14:46:48',1),(24,'院团委','2020-11-25 11:00:47','2020-11-25 14:46:55',1),(25,'qweqwe','2020-11-25 11:01:37','2020-11-25 11:01:37',1),(26,'eqweqweqwe','2020-11-25 11:01:53','2020-11-25 11:01:53',1),(27,'分团委','2020-11-25 11:11:35','2020-11-25 15:49:18',1),(28,'sadsads','2020-11-25 11:17:59','2020-11-25 11:18:40',0),(29,'院团委','2020-11-25 11:29:13','2020-11-25 15:49:25',1),(30,'789','2020-11-25 11:30:54','2020-11-25 11:37:19',0),(31,'zyk','2020-11-25 11:36:51','2020-11-25 11:37:19',0),(32,'委员会','2020-11-25 16:26:23','2020-11-26 08:37:48',0),(33,'委员会~~~','2020-11-25 16:26:37','2020-11-26 10:01:40',1),(34,'演示~','2020-11-26 10:01:29','2020-11-26 10:01:33',0),(35,'筑基修士!!!修改!','2020-11-26 10:28:53','2023-03-15 16:44:39',1);

/*Table structure for table `user_info` */

DROP TABLE IF EXISTS `user_info`;

CREATE TABLE `user_info` (
  `user_id` int(11) NOT NULL AUTO_INCREMENT,
  `user_name` varchar(255) COLLATE utf8mb4_hungarian_ci NOT NULL,
  `user_pwd` varchar(255) COLLATE utf8mb4_hungarian_ci NOT NULL,
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
  `status` int(11) DEFAULT '1' COMMENT '1:启用 0:禁用',
  PRIMARY KEY (`user_id`)
) ENGINE=InnoDB AUTO_INCREMENT=121 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_hungarian_ci;

/*Data for the table `user_info` */

insert  into `user_info`(`user_id`,`user_name`,`user_pwd`,`create_time`,`update_time`,`status`) values (1,'宋书航','1','2020-11-23 09:30:16','2020-11-25 22:16:25',1),(2,'雨柔子','1','2020-11-23 11:25:41','2020-11-25 22:16:25',1),(4,'王五','1','2020-11-23 11:25:58','2020-11-25 22:16:26',0),(5,'赵柳','1','2020-11-23 11:26:12','2020-11-25 22:16:26',0),(8,'田七','1','2020-11-23 11:26:29','2020-11-25 22:16:27',0),(9,'田七','1','2020-11-23 15:03:23','2020-11-25 22:16:28',0),(10,'田七','1','2020-11-23 15:03:43','2020-11-25 22:16:28',0),(11,'戴沐白','1','2020-11-24 10:45:06','2020-11-25 22:16:29',1),(12,'张小凡','1','2020-11-24 10:45:29','2020-11-25 22:16:29',1),(13,'userName2','1','2020-11-24 10:45:29','2020-11-25 22:16:30',0),(15,'碧瑶','1','2020-11-24 10:45:29','2020-11-25 22:16:31',1),(16,'赵恋凡','1','2020-11-24 10:45:29','2020-11-25 22:16:31',1),(17,'李长寿','1','2020-11-24 10:45:29','2020-11-25 22:16:32',1),(18,'蓝梦娥','1','2020-11-24 10:45:29','2020-11-25 22:16:33',1),(22,'路明非','123456','2020-11-24 10:45:29','2020-11-25 17:44:31',1),(23,'楚子航','123456','2020-11-24 10:45:29','2020-11-25 22:14:26',1),(33,'乔微尼','123456','2020-11-24 10:45:29','2020-11-25 23:05:48',1),(97,'userName86','123456','2020-11-24 10:45:32','2020-11-24 10:45:32',1),(98,'userName87','123456','2020-11-24 10:45:32','2020-11-24 10:45:32',1),(99,'userName88','123456','2020-11-24 10:45:32','2020-11-24 10:45:32',1),(100,'userName89','123456','2020-11-24 10:45:32','2020-11-24 10:45:32',1),(101,'2020年好运来~','123456','2020-11-24 10:45:32','2020-11-26 10:01:06',1),(102,'333','123456','2020-11-24 10:45:32','2020-11-26 10:28:14',1),(103,'666','888','2020-11-24 10:45:32','2020-11-26 10:28:26',1),(104,'userName93','123456','2020-11-24 10:45:32','2020-11-26 10:00:54',0),(105,'userName94','123456','2020-11-24 10:45:32','2020-11-26 10:00:47',0),(106,'userName95','123456','2020-11-24 10:45:32','2020-11-26 10:00:47',0),(107,'userName96','123456','2020-11-24 10:45:32','2020-11-26 10:00:47',0),(108,'userName97','123456','2020-11-24 10:45:32','2020-11-26 10:00:47',0),(109,'userName98','123456','2020-11-24 10:45:32','2020-11-25 10:12:58',0),(110,'userName99','123456','2020-11-24 10:45:32','2020-11-25 10:12:58',0),(111,'userName100','123456','2020-11-24 10:45:33','2020-11-25 10:12:58',0),(115,'萧潜','1','2020-11-25 23:00:16','2020-11-25 23:00:16',1),(116,'演示视频','haha','2020-11-26 10:00:33','2020-11-26 10:27:37',0),(117,'啦啦啦','1','2020-11-26 10:27:14','2020-11-26 10:27:37',0),(118,'演示视频','222','2020-11-26 10:27:23','2020-11-26 10:27:37',0),(119,'实训小组hyy','111','2020-11-26 14:37:14','2020-11-26 14:37:14',1),(120,'我是新增数据!修改!','1111','2023-04-18 20:38:41','2023-04-18 20:48:13',1);

/*!40101 SET SQL_MODE=@OLD_SQL_MODE */;
/*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;
/*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;
/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;

2.1【Logstash实现MySQL数据同步至ES】

先启动es,在启动logstash。

大数据周会-本周学习内容总结06【Linux启动ELK步骤】

input {
	stdin {
    }

    jdbc { # 01
		# 配置MySQL数据库链接,变量为数据库名
		jdbc_connection_string => "jdbc:mysql://1.2.3.4:3306/news_manager"
		# 配置MySQL数据库用户名和密码
		jdbc_user => "root"
		jdbc_password => "root"
		# MySQL驱动jar包存放位置
		jdbc_driver_library => "/opt/jar/mysql-connector-java-5.1.31.jar"
		# MySQL驱动类名
		jdbc_driver_class => "com.mysql.jdbc.Driver"
		jdbc_paging_enabled => "true"
		jdbc_page_size => "50000"
		# 执行的sql,文件路径+名称:statement_filepath
		# statement_filepath => ""
		# 要执行的sql语句
		statement => "select * from item_user"
		# 设置监听间隔,各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
		schedule => "* * * * *"
		# 索引类型
		type => "item_user"
	}

    jdbc { # 02
		# 配置MySQL数据库链接,变量为数据库名
		jdbc_connection_string => "jdbc:mysql://1.2.3.4:3306/news_manager"
		# 配置MySQL数据库用户名和密码
		jdbc_user => "root"
		jdbc_password => "root"
		# MySQL驱动jar包存放位置
		jdbc_driver_library => "/opt/jar/mysql-connector-java-5.1.31.jar"
		# MySQL驱动类名
		jdbc_driver_class => "com.mysql.jdbc.Driver"
		jdbc_paging_enabled => "true"
		jdbc_page_size => "50000"
		# 执行的sql,文件路径+名称:statement_filepath
		# statement_filepath => ""
		# 要执行的sql语句
		statement => "select * from logs_info"
		# 设置监听间隔,各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
		schedule => "* * * * *"
		# 索引类型
		type => "logs_info"
	}

    jdbc { # 03
		# 配置MySQL数据库链接,变量为数据库名
		jdbc_connection_string => "jdbc:mysql://1.2.3.4:3306/news_manager"
		# 配置MySQL数据库用户名和密码
		jdbc_user => "root"
		jdbc_password => "root"
		# MySQL驱动jar包存放位置
		jdbc_driver_library => "/opt/jar/mysql-connector-java-5.1.31.jar"
		# MySQL驱动类名
		jdbc_driver_class => "com.mysql.jdbc.Driver"
		jdbc_paging_enabled => "true"
		jdbc_page_size => "50000"
		# 执行的sql,文件路径+名称:statement_filepath
		# statement_filepath => ""
		# 要执行的sql语句
		statement => "select * from news_item"
		# 设置监听间隔,各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
		schedule => "* * * * *"
		# 索引类型
		type => "news_item"
	}

    jdbc { # 04
		# 配置MySQL数据库链接,变量为数据库名
		jdbc_connection_string => "jdbc:mysql://1.2.3.4:3306/news_manager"
		# 配置MySQL数据库用户名和密码
		jdbc_user => "root"
		jdbc_password => "root"
		# MySQL驱动jar包存放位置
		jdbc_driver_library => "/opt/jar/mysql-connector-java-5.1.31.jar"
		# MySQL驱动类名
		jdbc_driver_class => "com.mysql.jdbc.Driver"
		jdbc_paging_enabled => "true"
		jdbc_page_size => "50000"
		# 执行的sql,文件路径+名称:statement_filepath
		# statement_filepath => ""
		# 要执行的sql语句
		statement => "select * from news_info"
		# 设置监听间隔,各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
		schedule => "* * * * *"
		# 索引类型
		type => "news_info"
	}

    jdbc { # 05
		# 配置MySQL数据库链接,变量为数据库名
		jdbc_connection_string => "jdbc:mysql://1.2.3.4:3306/news_manager"
		# 配置MySQL数据库用户名和密码
		jdbc_user => "root"
		jdbc_password => "root"
		# MySQL驱动jar包存放位置
		jdbc_driver_library => "/opt/jar/mysql-connector-java-5.1.31.jar"
		# MySQL驱动类名
		jdbc_driver_class => "com.mysql.jdbc.Driver"
		jdbc_paging_enabled => "true"
		jdbc_page_size => "50000"
		# 执行的sql,文件路径+名称:statement_filepath
		# statement_filepath => ""
		# 要执行的sql语句
		statement => "select * from user_info"
		# 设置监听间隔,各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
		schedule => "* * * * *"
		# 索引类型
		type => "user_info"
	}
}

filter {
    json {
        source => "message"
        remove_field => ["message"]
    }
}

output {
	if[type] == "item_user" { # 01
		elasticsearch {
			hosts => ["1.2.3.4:9200"]
			index => "test_item_user"
			# document_id => "%{id}"
		}
	}

	if[type] == "logs_info" { # 02
		elasticsearch {
			hosts => ["1.2.3.4:9200"]
			index => "test_logs_info"
			# document_id => "%{id}"
		}
	}

	if[type] == "news_item" { # 03
		elasticsearch {
			hosts => ["1.2.3.4:9200"]
			index => "test_news_item"
			# document_id => "%{id}"
		}
	}

	if[type] == "news_info" { # 04
		elasticsearch {
			hosts => ["1.2.3.4:9200"]
			index => "test_news_info"
			# document_id => "%{id}"
		}
	}

	if[type] == "user_info" { # 05
		elasticsearch {
			hosts => ["1.2.3.4:9200"]
			index => "test_user_info"
			# document_id => "%{id}"
		}
	}

    stdout {
        codec => json_lines
    }
}

2.2【Logstash实现ES数据同步至MySQL】

  1. logstash-plugin install --no-verify logstash-output-jdbc   # Logstash安装插件logstash-output-jdbc
  2. logstash-plugin list   # 查看Logstash已安装的插件

【es与mysql双向同步-通过logstash将es同步至mysql】功能已实现,但是只进行了简单测试。问题包括但不限于:中文乱码、时间戳字段插入错误等。

input {
	elasticsearch {
		hosts => ["hadoop100:9200"]
		index => "test_user_info"
		query => '{ "query": { "match_all": {} } }'
		schedule => "* * * * *"
	}
}

output {
	jdbc {
		driver_jar_path => "/opt/jar/mysql-connector-java-5.1.31.jar"
		driver_class => "com.mysql.jdbc.Driver"
		# user => "root"
		# password => "root"
		# "jdbc:mysql://xxx.xxx.xxx.xxx:xxxx/douyin?autoReconnect=true&user=xxxx@xxxx&password=xxxxx"
		connection_string => "jdbc:mysql://1.2.3.4:3306/school_matriculate?autoReconnect=true&user=root&password=root&useSSL=false&serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8"
		# statement => ["insert into tb_videos(md5, Id, view, timestamp) values(?,?,?,?)","[md5]", "[Id]", "[view]", "[timestamp]"]
		# statement => ["INSERT INTO user_info (user_name, user_pwd, create_time, update_time, status) VALUES (?, ?, ?, ?, ?)", "[user_name]", "[user_pwd]", "[create_time]", "[update_time]", "[status]"]
		statement => ["INSERT INTO user_info (user_name, user_pwd) VALUES (?, ?)", "[user_name]", "[user_pwd]"]
	}
}

2.2.1【Bug记录】

Unknown setting 'jdbc_user' for jdbc

[2023-04-19T20:27:23,656][ERROR][logstash.outputs.jdbc    ] Unknown setting 'prepared_statement_bind_values' for jdbc

[2023-04-19T20:27:23,656][ERROR][logstash.outputs.jdbc    ] Unknown setting 'jdbc_password' for jdbc

[2023-04-19T20:27:23,656][ERROR][logstash.outputs.jdbc    ] Unknown setting 'jdbc_driver_library' for jdbc

[2023-04-19T20:27:23,656][ERROR][logstash.outputs.jdbc    ] Unknown setting 'jdbc_connection_string' for jdbc

[2023-04-19T20:27:23,656][ERROR][logstash.outputs.jdbc    ] Unknown setting 'jdbc_driver_class' for jdbc

[2023-04-19T20:27:23,669][ERROR][logstash.agent           ] Failed to execute action

java.sql.SQLException: Access denied for user ''@'upward' (using password: NO)

        at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1094) ~[mysql-connector-java-5.1.31.jar:?]

[2023-04-19T21:48:53,751][ERROR][com.zaxxer.hikari.pool.HikariPool][main] HikariPool-1 - Exception during pool initialization.

com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: Could not create connection to database server. Attempted reconnect 3 times. Giving up.

2.2.2【参考文章】

  1. Java:Logstash如何安装插件logstash-output-jdbc_netyeaxi的博客-CSDN博客
  2. logstash的logstash-output-jdbc插件安装_logstash output jdbc_&捕风的汉子&的博客-CSDN博客
  3. logstash-output-jdbc使用
  4. https://github.com/theangryangel/logstash-output-jdbc

  5. https://www.elastic.co/guide/en/logstash/current/index.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/437985.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Spring MVC基本认识与操作

SpringMVC是隶属于Spring框架的一部分,主要是用来进行Web开发,是对Servlet进行了封装。 先来介绍三个概念: SpringMVC是处于Web层的框架,所以其主要的作用就是用来接收前端发过来的请求和数据然后经过处理并将处理的结果响应给前…

SpringCloud 微服务系列——【基础与服务注册中心详解】

✅作者简介:2022年博客新星 第八。热爱国学的Java后端开发者,修心和技术同步精进。 🍎个人主页:Java Fans的博客 🍊个人信条:不迁怒,不贰过。小知识,大智慧。 💞当前专栏…

文本批量翻译-批量翻译文件名

批量将英文翻译成中文的软件 批量将英文翻译成中文的软件的主要用途场景主要是在需要大量翻译英文文本到中文的场景下使用,例如: 商务文件翻译:许多企业需要将其商务文件,如合同、报告、信函等翻译成中文,以便其中文读…

HTML5 <p> 标签、HTML5 <pre> 标签

HTML5 <p> 标签 实例 HTML5 <p>标签用于定义一个段落。请参考下述示例&#xff1a; 以下代码标记了一个段落&#xff1a; <p>这是一个段落。</p>尝试一下 (在页面下部&#xff0c;您可以找到更多实例) 浏览器支持 目前大多数浏览器支持 <p>标…

java健身房会员签到,会员提醒,留言,消费,公告

1. 主页&#xff1a;即时登录,提供会员和管理员的登录。 2. 会员卡办理&#xff1a;登记健身会员的信息,设置卡到期时间。 3. 会员消费系统&#xff1a;对健身会员的日常消费进行添加。 4. 在线交流&#xff1a;健身会员之间的交流,管理员可以对其问题进行回复。 5.提醒功能&am…

一篇文章快速入门Spring AMQP

文章目录 一、AMQP二、Spring AMQP2.1 介绍2.2 SpringAMQP发送消息2.3 SpringAMQP接收消息2.4 WorkQueue模型2.4.1 概念2.4.2 示例 2.5 发布订阅模型2.5.1 介绍2.5.2 Fanout Exchange2.5.3 Direct Exchange2.5.4 Topic Exchange 2.6 消息转换器2.6.1 介绍2.6.2 切换消息转换器 …

重写equlas时为什么一定要重写hashcode方法?

equals方法和hashCode方法都是Object类中的两个基本方法&#xff0c;它们共同来判断两个对象是否相等。为什么要两个方法结合起来使用呢&#xff1f;原因是在 ‘性能’ 上面。 使用过 hashMap 我们知道&#xff0c;通过 hash 计算 &#xff0c;可以快速的在常量时间内找到某个…

webpack基本认知,它是什么,做什么的

一、基本概述 webpack本质是, 一个第三方模块包, 用于分析, 并打包代码 支持所有类型文件的打包支持less/sass > css支持ES6/7/8 > ES5压缩代码, 提高加载速度 二、安装 创建一个文件并运行以下命令&#xff1a; npm init -y npm i webpack webpack-cli -S 运行命令…

DNS域名解析服务

目录 一、DNS的简介 1&#xff09;DNS 数据结构分布 2&#xff09;服务器的类型 3&#xff09;DNS 域名解析方式 4&#xff09;DNS的查询方式 递归查询 迭代查询 二、DNS配置 1&#xff09;两台主从服务器进行配置操作 ​编辑 2&#xff09;DNS主域名服务器配置&am…

ITSS服务经理 、服务工程师线上开班在即

为了促进企业信息技术服务-运行维护服务能力&#xff0c;全面系统的提升员工的IT服务知识和技能水平&#xff0c;且更好的满足参训企业的时间需求&#xff0c;我司将于5月份开展ITSS服务经理、服务工程师线上班。 日期和形式 五月份&#xff1a;ITSS服务项目经理&#xff1a;…

Qlik Sense 集合表达式详解

文章目录 1 概述2 集合表达式 expression2.1 标识符 identifiers2.2 修饰符 modifiers2.2.1 多值用 &#xff0c;隔开2.2.2 引号区分大小写2.2.3 搜索 2.3 运算符 operators 3 应用 1 概述 #mermaid-svg-bQWKUrD934SlJaj9 {font-family:"trebuchet ms",verdana,arial…

电子招标采购系统源码:营造全面规范安全的电子招投标环境,促进招投标市场健康可持续发展

营造全面规范安全的电子招投标环境&#xff0c;促进招投标市场健康可持续发展 传统采购模式面临的挑战 一、立项管理 1、招标立项申请 功能点&#xff1a;招标类项目立项申请入口&#xff0c;用户可以保存为草稿&#xff0c;提交。 2、非招标立项申请 功能点&#xff1a;非招标…

CompletableFuture的基本使用和原理

CompletableFuture CompletableFuture是对Future的扩展和增强。CompletableFuture实现了Future接口&#xff0c;并在此基础上进行了丰富的扩展&#xff0c;完美弥补了Future的局限性&#xff0c;同时CompletableFuture实现了对任务编排的能力。借助这项能力&#xff0c;可以轻…

如何将matlab的m文件转换成python文件

因为matlab的内存实在太大了&#xff0c;所以我只在实验室电脑安装了matlab&#xff0c;自己电脑没有安装&#xff0c;现在跑实验需要把matlab文件转成python文件。在网上找到可以使用smop小工具。 我是在本地的anaconda转换的。先创建一个新环境到指定路径 conda create --pr…

HttpWebRequest类

HttpWebRequest类与HttpRequest类的区别。 HttpRequest类的对象用于服务器端&#xff0c;获取客户端传来的请求的信息&#xff0c;包括HTTP报文传送过来的所有信息。而HttpWebRequest用于客户端&#xff0c;拼接请求的HTTP报文并发送等。 HttpWebRequest这个类非常强大&#…

Spring MVC 接收 json 和返回 json (14)

目录 总入口 测试case 源码分析 1. 针对RequestBody的参数解析 2. 针对 ResponseBody 的返回值处理 总入口 通过上一篇Spring MVC 参数解析&#xff08;13&#xff09;_chen_yao_kerr的博客-CSDN博客的说明&#xff0c;相信大家对Sping MVC的参数解析有了一定的了解&…

2.微服务项目实战---环境搭建,实现电商中商品、订单、用户

使用的电商项目中的商品、订单、用户为案例进行讲解。 2.1 案例准备 2.1.1 技术选型 maven &#xff1a; 3.3.9 数据库&#xff1a; MySQL 5.7 持久层 : SpingData Jpa 其他 : SpringCloud Alibaba 技术栈 2.1.2 模块设计 springcloud-alibaba 父工程 shop-common …

【观察】构建“零信任”架构,筑起制造业安全“护城河”

中国是全球制造业大国&#xff0c;过去40年&#xff0c;中国制造业规模增长了18倍&#xff0c;其附加值达到2.2万亿美元&#xff0c;制造业在中国GDP比重高达40%&#xff0c;其之于中国经济的重要性可见一斑。 与此同时&#xff0c;中国制造业在高速发展的同时&#xff0c;也普…

使用全球融合CDN的10大优势

根据预估&#xff0c;今年的全球内容交付网络&#xff08;CDN&#xff09;市场预计将达到424亿美元。而由于移动应用程序的激增和人工智能尤其是ChatGPT等相关领域的快速发展将进一步带来CDN市场的快速增长&#xff0c;可以说全球CDN的黄金时代才刚开始。 融合CDN和多CDN战略是…

32道子网划分练习题详细解析含答案

目录 1 子网划分概念&#xff1a; 2 划分方法&#xff1a; 子网划分方法&#xff1a;段&#xff0c;块&#xff0c;数的计算三步。 段就是确定ip地址段中既有网络地址&#xff0c;又有主机地址的那一段是四段中的那一段&#xff1f; 块就确定上一步中确定的那一段中的主机…