SpringBatch 是一个轻量级、全面的批处理框架。这里我们用它来实现文件的读取并将读取的结果作处理,处理之后再写入数据库中的功能。
更多的 SpringBatch 功能请移步至 Spring Batch
项目需求
近日需要实现用户推荐相关的功能,也就是说向用户推荐他可能喜欢的东西。
我们的数据分析工程师会将用户以及用户可能喜欢的东西整理成文档给我,我只需要将数据从文档中读取出来,然后对数据进行进一步的清洗(例如去掉特殊符号,长度如果太长则截取)。然后将处理后的数据存入数据库(Mysql)。
所以分为三步:
- 读取文档获得数据
- 对获得的数据进行处理
- 更新数据库(新增或更新)
考虑到这个数据量以后会越来越大,这里没有使用 poi 来读取数据,而直接使用了 SpringBatch。
实现步骤
本文假设读者已经能够使用 SpringBoot 连接处理 Mysql,所以这部分文中会省略。
1.创建 Maven 项目,并在 pom.xml 中添加依赖
org.springframework.boot spring-boot-starter-parent 1.5.2.RELEASE 1.8 org.springframework.boot spring-boot-starter-batch org.springframework.boot spring-boot-starter-data-jpa org.springframework.boot spring-boot-starter-test test org.mybatis.spring.boot mybatis-spring-boot-starter 1.2.0 org.projectlombok lombok 1.12.6 org.apache.commons commons-lang3 3.4 mysql mysql-connector-java runtime com.alibaba druid 1.0.26 org.springframework.boot spring-boot-starter-web
这里是这个小项目中用到的所有依赖,包括连接数据库的依赖以及工具类等。
2.编写 Model 类
我们要从文档中读取的有效列就是 uid,tag,type,就是用户 ID,用户可能包含的标签 (用于推送),用户类别(用户用户之间互相推荐)。
UserMap.java 中的 @Entity,@Column 注解,是为了利用 JPA 生成数据表而写的,可要可不要。
UserMap.java
@Data @EqualsAndHashCode @NoArgsConstructor @AllArgsConstructor //@Entity(name = "user_map") public class UserMap extends BaseModel { @Column(name = "uid", unique = true, nullable = false) private Long uid; @Column(name = "tag") private String tag; @Column(name = "type") private Integer type; }
3.实现批处理配置类
BatchConfiguration.java
@Configuration @EnableBatchProcessing public class BatchConfiguration { @Autowired public JobBuilderFactory jobBuilderFactory; @Autowired public StepBuilderFactory stepBuilderFactory; @Autowired @Qualifier("prodDataSource") DataSource prodDataSource; @Bean public FlatFileItemReader reader() { FlatFileItemReader reader = new FlatFileItemReader<>(); reader.setResource(new ClassPathResource("c152.txt")); reader.setLineMapper(new DefaultLineMapper() {{ setLineTokenizer(new DelimitedLineTokenizer("|") {{ setNames(new String[]{"uid", "tag", "type"}); }}); setFieldSetMapper(new BeanWrapperFieldSetMapper() {{ setTargetType(UserMap.class); }}); }}); return reader; } @Bean public JdbcBatchItemWriter importWriter() { JdbcBatchItemWriter writer = new JdbcBatchItemWriter<>(); writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>()); writer.setSql("INSERT INTO user_map (uid,tag,type) VALUES (:uid, :tag,:type)"); writer.setDataSource(prodDataSource); return writer; } @Bean public JdbcBatchItemWriter updateWriter() { JdbcBatchItemWriter writer = new JdbcBatchItemWriter<>(); writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>()); writer.setSql("UPDATE user_map SET type = (:type),tag = (:tag) WHERE uid = (:uid)"); writer.setDataSource(prodDataSource); return writer; } @Bean public UserMapItemProcessor processor(UserMapItemProcessor.ProcessStatus processStatus) { return new UserMapItemProcessor(processStatus); } @Bean public Job importUserJob(JobCompletionNotificationListener listener) { return jobBuilderFactory.get("importUserJob") .incrementer(new RunIdIncrementer()) .listener(listener) .flow(importStep()) .end() .build(); } @Bean public Step importStep() { return stepBuilderFactory.get("importStep") .chunk(100) .reader(reader()) .processor(processor(IMPORT)) .writer(importWriter()) .build(); } @Bean public Job updateUserJob(JobCompletionNotificationListener listener) { return jobBuilderFactory.get("updateUserJob") .incrementer(new RunIdIncrementer()) .listener(listener) .flow(updateStep()) .end() .build(); } @Bean public Step updateStep() { return stepBuilderFactory.get("updateStep") .chunk(100) .reader(reader()) .processor(processor(UPDATE)) .writer(updateWriter()) .build(); } }
prodDataSource 是假设用户已经设置好的,如果不知道怎么配置,也可以参考我之前的文章进行配置:Springboot 集成 Mybatis。
reader(),这方法从文件中读取数据,并且设置了一些必要的参数。紧接着是写操作 importWriter() 和 updateWriter(),读者看其中一个就好,因为我这里是需要更新或者修改的,所以分为两个。
processor(ProcessStatus status),该方法是对我们处理数据的类进行实例化,这里我根据 status 是 IMPORT 还是 UPDATE 来获取不同的处理结果。
其他的看代码就可以看懂了,哈哈,不详细说了。
4.将获得的数据进行清洗
UserMapItemProcessor.java
public class UserMapItemProcessor implements ItemProcessor { private static final int MAX_TAG_LENGTH = 200; private ProcessStatus processStatus; public UserMapItemProcessor(ProcessStatus processStatus) { this.processStatus = processStatus; } @Autowired IUserMapService userMapService; private static final String TAG_PATTERN_STR = "^[a-zA-Z0-9\u4E00-\u9FA5_-]+$"; public static final Pattern TAG_PATTERN = Pattern.compile(TAG_PATTERN_STR); private static final Logger LOG = LoggerFactory.getLogger(UserMapItemProcessor.class); @Override public UserMap process(UserMap userMap) throws Exception { Long uid = userMap.getUid(); String tag = cleanTag(userMap.getTag()); Integer label = userMap.getType() == null ? Integer.valueOf(0) : userMap.getType(); if (StringUtils.isNotBlank(tag)) { Map params = new HashMap<>(); params.put("uid", uid); UserMap userMapFromDB = userMapService.selectOne(params); if (userMapFromDB == null) { if (this.processStatus == ProcessStatus.IMPORT) { return new UserMap(uid, tag, label); } } else { if (this.processStatus == ProcessStatus.UPDATE) { if (!tag.equals(userMapFromDB.getTag()) && !label.equals(userMapFromDB.getType())) { userMapFromDB.setType(label); userMapFromDB.setTag(tag); return userMapFromDB; } } } } return null; } /** * 清洗标签 * * @param tag * @return */ private static String cleanTag(String tag) { if (StringUtils.isNotBlank(tag)) { try { tag = tag.substring(tag.indexOf("{") + 1, tag.lastIndexOf("}")); String[] tagArray = tag.split(","); Optional reduce = Arrays.stream(tagArray).parallel() .map(str -> str.split(":")[0]) .map(str -> str.replaceAll("\'", "")) .map(str -> str.replaceAll(" ", "")) .filter(str -> TAG_PATTERN.matcher(str).matches()) .reduce((x, y) -> x + "," + y); Function str = (s -> s.length() > MAX_TAG_LENGTH ? s.substring(0, MAX_TAG_LENGTH) : s); return str.apply(reduce.get()); } catch (Exception e) { LOG.error(e.getMessage(), e); } } return null; } protected enum ProcessStatus { IMPORT, UPDATE; } public static void main(String[] args) { String distinctTag = cleanTag("Counter({'《重新定义》': 3, '轻想上的轻小说': 3, '小说': 2, 'Fate': 2, '同人小说': 2, '雪狼八组': 1, " + "'社会': 1, '人文': 1, '短篇': 1, '重新定义': 1, 'AMV': 1, '《FBD》': 1, '《雪狼六组》': 1, '战争': 1, '《灰羽联盟》': 1, " + "'谁说轻想没人写小说': 1})"); System.out.println(distinctTag); } }
读取到的数据格式如 main() 方法所示,清理之后的结果如:
轻想上的轻小说,小说,Fate,同人小说,雪狼八组,社会,人文,短篇,重新定义,AMV,战争,谁说轻想没人写小说 。
去掉了特殊符号以及数字等。使用了 Java8 的 Lambda 表达式。
并且这里在处理的时候,判断如果该数据用户已经存在,则进行更新,如果不存在,则新增。
5.Job 执行结束回调类
JobCompletionNotificationListener.java
@Component public class JobCompletionNotificationListener extends JobExecutionListenerSupport { private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class); private final JdbcTemplate jdbcTemplate; @Autowired public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) { this.jdbcTemplate = jdbcTemplate; } @Override public void afterJob(JobExecution jobExecution) { System.out.println("end ....."); } }
具体的逻辑可自行实现。
完成以上几个步骤,运行项目,就可以读取并写入数据到数据库了。
作者:Be a funny man
转载请注明出处
发自网易邮箱大师