延迟任务精准发布文章

1)文章定时发布

2)延迟任务概述

2.1)什么是延迟任务

  • 定时任务:有固定周期的,有明确的触发时间
  • 延迟队列:没有固定的开始时间,它常常是由一个事件触发的,而在这个事件触发之后的一段时间内触发另一个事件,任务可以立即执行,也可以延迟

pkTTxeA.png

应用场景:

场景一:订单下单之后30分钟后,如果用户没有付钱,则系统自动取消订单;如果期间下单成功,任务取消

场景二:接口对接出现网络问题,1分钟后重试,如果失败,2分钟重试,直到出现阈值终止

2.2)技术对比

2.2.1)DelayQueue

JDK自带DelayQueue 是一个支持延时获取元素的阻塞队列, 内部采用优先队列 PriorityQueue 存储元素,同时元素必须实现 Delayed 接口;在创建元素时可以指定多久才可以从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素

pkT79FP.png

DelayQueue属于排序队列,它的特殊之处在于队列的元素必须实现Delayed接口,该接口需要实现compareTo和getDelay方法

getDelay方法:获取元素在队列中的剩余时间,只有当剩余时间为0时元素才可以出队列。

compareTo方法:用于排序,确定元素出队列的顺序。

实现:

1:在测试包jdk下创建延迟任务元素对象DelayedTask,实现compareTo和getDelay方法,

2:在main方法中创建DelayQueue并向延迟队列中添加三个延迟任务,

3:循环的从延迟队列中拉取任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class DelayedTask  implements Delayed{

// 任务的执行时间
private int executeTime = 0;

public DelayedTask(int delay){
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.SECOND,delay);
this.executeTime = (int)(calendar.getTimeInMillis() /1000 );
}

/**
* 元素在队列中的剩余时间
* @param unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
Calendar calendar = Calendar.getInstance();
return executeTime - (calendar.getTimeInMillis()/1000);
}

/**
* 元素排序
* @param o
* @return
*/
@Override
public int compareTo(Delayed o) {
long val = this.getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
return val == 0 ? 0 : ( val < 0 ? -1: 1 );
}


public static void main(String[] args) {
DelayQueue<DelayedTask> queue = new DelayQueue<DelayedTask>();

queue.add(new DelayedTask(5));
queue.add(new DelayedTask(10));
queue.add(new DelayedTask(15));

System.out.println(System.currentTimeMillis()/1000+" start consume ");
while(queue.size() != 0){
DelayedTask delayedTask = queue.poll();
if(delayedTask !=null ){
System.out.println(System.currentTimeMillis()/1000+" cosume task");
}
//每隔一秒消费一次
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

DelayQueue实现完成之后思考一个问题:

使用线程池或者原生DelayQueue程序挂掉之后,任务都是放在内存,需要考虑未处理消息的丢失带来的影响,如何保证数据不丢失,需要持久化(磁盘)

2.2.2)RabbitMQ实现延迟任务
  • TTL:Time To Live (消息存活时间)

  • 死信队列:Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以重新发送另一个交换机(死信交换机)

pkT7CJf.png

2.2.3)redis实现

zset数据类型的去重有序(分数排序)特点进行延迟。例如:时间戳作为score进行排序

pkT7PW8.png

3)redis实现延迟任务

实现思路

pkT7FSS.png

问题思路

1.为什么任务需要存储在数据库中?

延迟任务是一个通用的服务,任何需要延迟得任务都可以调用该服务,需要考虑数据持久化的问题,存储数据库中是一种数据安全的考虑。

2.为什么redis中使用两种数据类型,list和zset?

效率问题,算法的时间复杂度

3.在添加zset数据的时候,为什么不需要预加载?

任务模块是一个通用的模块,项目中任何需要延迟队列的地方,都可以调用这个接口,要考虑到数据量的问题,如果数据量特别大,为了防止阻塞,只需要把未来几分钟要执行的数据存入缓存即可。

4)延迟任务服务实现

4.1)搭leadnews-schedule模块

4.2)数据库准备

taskinfo 任务表

pkT7Zes.png

实体类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package com.heima.model.schedule.pojos;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;

import java.io.Serializable;
import java.util.Date;

/**
* <p>
*
* </p>
*
* @author itheima
*/
@Data
@TableName("taskinfo")
public class Taskinfo implements Serializable {

private static final long serialVersionUID = 1L;

/**
* 任务id
*/
@TableId(type = IdType.ID_WORKER)
private Long taskId;

/**
* 执行时间
*/
@TableField("execute_time")
private Date executeTime;

/**
* 参数
*/
@TableField("parameters")
private byte[] parameters;

/**
* 优先级
*/
@TableField("priority")
private Integer priority;

/**
* 任务类型
*/
@TableField("task_type")
private Integer taskType;


}

taskinfo_logs 任务日志表

pkT7ewn.png

实体类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package com.heima.model.schedule.pojos;

import com.baomidou.mybatisplus.annotation.*;
import lombok.Data;

import java.io.Serializable;
import java.util.Date;

/**
* <p>
*
* </p>
*
* @author itheima
*/
@Data
@TableName("taskinfo_logs")
public class TaskinfoLogs implements Serializable {

private static final long serialVersionUID = 1L;

/**
* 任务id
*/
@TableId(type = IdType.ID_WORKER)
private Long taskId;

/**
* 执行时间
*/
@TableField("execute_time")
private Date executeTime;

/**
* 参数
*/
@TableField("parameters")
private byte[] parameters;

/**
* 优先级
*/
@TableField("priority")
private Integer priority;

/**
* 任务类型
*/
@TableField("task_type")
private Integer taskType;

/**
* 版本号,用乐观锁
*/
@Version
private Integer version;

/**
* 状态 0=int 1=EXECUTED 2=CANCELLED
*/
@TableField("status")
private Integer status;


}

乐观锁支持:

1
2
3
4
5
6
7
8
9
10
/**
* mybatis-plus乐观锁支持
* @return
*/
@Bean
public MybatisPlusInterceptor optimisticLockerInterceptor(){
MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor());
return interceptor;
}

4.4)项目集成redis

① 在项目导入redis相关依赖,已经完成

1
2
3
4
5
6
7
8
9
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- redis依赖commons-pool 这个依赖一定要添加 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>

② 在heima-leadnews-schedule中集成redis,添加以下nacos配置,链接上redis

1
2
3
4
5
spring:
redis:
host: 192.168.200.130
password: leadnews
port: 6379

③ 拷贝资料文件夹下的类:CacheService到heima-leadnews-common模块下,并添加自动配置

操作缓存的工具类:

image-20210514181214681

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
package com.heima.schedule.config;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.DataType;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.StringRedisConnection;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.ZSetOperations.TypedTuple;
import org.springframework.lang.Nullable;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.*;
import java.util.concurrent.TimeUnit;

@Component
public class CacheService extends CachingConfigurerSupport {
@Autowired
private StringRedisTemplate stringRedisTemplate;

public StringRedisTemplate getstringRedisTemplate() {
return this.stringRedisTemplate;
}

/** -------------------key相关操作--------------------- */

/**
* 删除key
*
* @param key
*/
public void delete(String key) {
stringRedisTemplate.delete(key);
}

/**
* 批量删除key
*
* @param keys
*/
public void delete(Collection<String> keys) {
stringRedisTemplate.delete(keys);
}

/**
* 序列化key
*
* @param key
* @return
*/
public byte[] dump(String key) {
return stringRedisTemplate.dump(key);
}

/**
* 是否存在key
*
* @param key
* @return
*/
public Boolean exists(String key) {
return stringRedisTemplate.hasKey(key);
}

/**
* 设置过期时间
*
* @param key
* @param timeout
* @param unit
* @return
*/
public Boolean expire(String key, long timeout, TimeUnit unit) {
return stringRedisTemplate.expire(key, timeout, unit);
}

/**
* 设置过期时间
*
* @param key
* @param date
* @return
*/
public Boolean expireAt(String key, Date date) {
return stringRedisTemplate.expireAt(key, date);
}

/**
* 查找匹配的key
*
* @param pattern
* @return
*/
public Set<String> keys(String pattern) {
return stringRedisTemplate.keys(pattern);
}

/**
* 将当前数据库的 key 移动到给定的数据库 db 当中
*
* @param key
* @param dbIndex
* @return
*/
public Boolean move(String key, int dbIndex) {
return stringRedisTemplate.move(key, dbIndex);
}

/**
* 移除 key 的过期时间,key 将持久保持
*
* @param key
* @return
*/
public Boolean persist(String key) {
return stringRedisTemplate.persist(key);
}

/**
* 返回 key 的剩余的过期时间
*
* @param key
* @param unit
* @return
*/
public Long getExpire(String key, TimeUnit unit) {
return stringRedisTemplate.getExpire(key, unit);
}

/**
* 返回 key 的剩余的过期时间
*
* @param key
* @return
*/
public Long getExpire(String key) {
return stringRedisTemplate.getExpire(key);
}

/**
* 从当前数据库中随机返回一个 key
*
* @return
*/
public String randomKey() {
return stringRedisTemplate.randomKey();
}

/**
* 修改 key 的名称
*
* @param oldKey
* @param newKey
*/
public void rename(String oldKey, String newKey) {
stringRedisTemplate.rename(oldKey, newKey);
}

/**
* 仅当 newkey 不存在时,将 oldKey 改名为 newkey
*
* @param oldKey
* @param newKey
* @return
*/
public Boolean renameIfAbsent(String oldKey, String newKey) {
return stringRedisTemplate.renameIfAbsent(oldKey, newKey);
}

/**
* 返回 key 所储存的值的类型
*
* @param key
* @return
*/
public DataType type(String key) {
return stringRedisTemplate.type(key);
}

/** -------------------string相关操作--------------------- */

/**
* 设置指定 key 的值
* @param key
* @param value
*/
public void set(String key, String value) {
stringRedisTemplate.opsForValue().set(key, value);
}

/**
* 获取指定 key 的值
* @param key
* @return
*/
public String get(String key) {
return stringRedisTemplate.opsForValue().get(key);
}

/**
* 返回 key 中字符串值的子字符
* @param key
* @param start
* @param end
* @return
*/
public String getRange(String key, long start, long end) {
return stringRedisTemplate.opsForValue().get(key, start, end);
}

/**
* 将给定 key 的值设为 value ,并返回 key 的旧值(old value)
*
* @param key
* @param value
* @return
*/
public String getAndSet(String key, String value) {
return stringRedisTemplate.opsForValue().getAndSet(key, value);
}

/**
* 对 key 所储存的字符串值,获取指定偏移量上的位(bit)
*
* @param key
* @param offset
* @return
*/
public Boolean getBit(String key, long offset) {
return stringRedisTemplate.opsForValue().getBit(key, offset);
}

/**
* 批量获取
*
* @param keys
* @return
*/
public List<String> multiGet(Collection<String> keys) {
return stringRedisTemplate.opsForValue().multiGet(keys);
}

/**
* 设置ASCII码, 字符串'a'的ASCII码是97, 转为二进制是'01100001', 此方法是将二进制第offset位值变为value
*
* @param key
* @param
* @param value
* 值,true为1, false为0
* @return
*/
public boolean setBit(String key, long offset, boolean value) {
return stringRedisTemplate.opsForValue().setBit(key, offset, value);
}

/**
* 将值 value 关联到 key ,并将 key 的过期时间设为 timeout
*
* @param key
* @param value
* @param timeout
* 过期时间
* @param unit
* 时间单位, 天:TimeUnit.DAYS 小时:TimeUnit.HOURS 分钟:TimeUnit.MINUTES
* 秒:TimeUnit.SECONDS 毫秒:TimeUnit.MILLISECONDS
*/
public void setEx(String key, String value, long timeout, TimeUnit unit) {
stringRedisTemplate.opsForValue().set(key, value, timeout, unit);
}

/**
* 只有在 key 不存在时设置 key 的值
*
* @param key
* @param value
* @return 之前已经存在返回false,不存在返回true
*/
public boolean setIfAbsent(String key, String value) {
return stringRedisTemplate.opsForValue().setIfAbsent(key, value);
}

/**
* 用 value 参数覆写给定 key 所储存的字符串值,从偏移量 offset 开始
*
* @param key
* @param value
* @param offset
* 从指定位置开始覆写
*/
public void setRange(String key, String value, long offset) {
stringRedisTemplate.opsForValue().set(key, value, offset);
}

/**
* 获取字符串的长度
*
* @param key
* @return
*/
public Long size(String key) {
return stringRedisTemplate.opsForValue().size(key);
}

/**
* 批量添加
*
* @param maps
*/
public void multiSet(Map<String, String> maps) {
stringRedisTemplate.opsForValue().multiSet(maps);
}

/**
* 同时设置一个或多个 key-value 对,当且仅当所有给定 key 都不存在
*
* @param maps
* @return 之前已经存在返回false,不存在返回true
*/
public boolean multiSetIfAbsent(Map<String, String> maps) {
return stringRedisTemplate.opsForValue().multiSetIfAbsent(maps);
}

/**
* 增加(自增长), 负数则为自减
*
* @param key
* @param
* @return
*/
public Long incrBy(String key, long increment) {
return stringRedisTemplate.opsForValue().increment(key, increment);
}

/**
*
* @param key
* @param
* @return
*/
public Double incrByFloat(String key, double increment) {
return stringRedisTemplate.opsForValue().increment(key, increment);
}

/**
* 追加到末尾
*
* @param key
* @param value
* @return
*/
public Integer append(String key, String value) {
return stringRedisTemplate.opsForValue().append(key, value);
}

/** -------------------hash相关操作------------------------- */

/**
* 获取存储在哈希表中指定字段的值
*
* @param key
* @param field
* @return
*/
public Object hGet(String key, String field) {
return stringRedisTemplate.opsForHash().get(key, field);
}

/**
* 获取所有给定字段的值
*
* @param key
* @return
*/
public Map<Object, Object> hGetAll(String key) {
return stringRedisTemplate.opsForHash().entries(key);
}

/**
* 获取所有给定字段的值
*
* @param key
* @param fields
* @return
*/
public List<Object> hMultiGet(String key, Collection<Object> fields) {
return stringRedisTemplate.opsForHash().multiGet(key, fields);
}

public void hPut(String key, String hashKey, String value) {
stringRedisTemplate.opsForHash().put(key, hashKey, value);
}

public void hPutAll(String key, Map<String, String> maps) {
stringRedisTemplate.opsForHash().putAll(key, maps);
}

/**
* 仅当hashKey不存在时才设置
*
* @param key
* @param hashKey
* @param value
* @return
*/
public Boolean hPutIfAbsent(String key, String hashKey, String value) {
return stringRedisTemplate.opsForHash().putIfAbsent(key, hashKey, value);
}

/**
* 删除一个或多个哈希表字段
*
* @param key
* @param fields
* @return
*/
public Long hDelete(String key, Object... fields) {
return stringRedisTemplate.opsForHash().delete(key, fields);
}

/**
* 查看哈希表 key 中,指定的字段是否存在
*
* @param key
* @param field
* @return
*/
public boolean hExists(String key, String field) {
return stringRedisTemplate.opsForHash().hasKey(key, field);
}

/**
* 为哈希表 key 中的指定字段的整数值加上增量 increment
*
* @param key
* @param field
* @param increment
* @return
*/
public Long hIncrBy(String key, Object field, long increment) {
return stringRedisTemplate.opsForHash().increment(key, field, increment);
}

/**
* 为哈希表 key 中的指定字段的整数值加上增量 increment
*
* @param key
* @param field
* @param delta
* @return
*/
public Double hIncrByFloat(String key, Object field, double delta) {
return stringRedisTemplate.opsForHash().increment(key, field, delta);
}

/**
* 获取所有哈希表中的字段
*
* @param key
* @return
*/
public Set<Object> hKeys(String key) {
return stringRedisTemplate.opsForHash().keys(key);
}

/**
* 获取哈希表中字段的数量
*
* @param key
* @return
*/
public Long hSize(String key) {
return stringRedisTemplate.opsForHash().size(key);
}

/**
* 获取哈希表中所有值
*
* @param key
* @return
*/
public List<Object> hValues(String key) {
return stringRedisTemplate.opsForHash().values(key);
}

/**
* 迭代哈希表中的键值对
*
* @param key
* @param options
* @return
*/
public Cursor<Map.Entry<Object, Object>> hScan(String key, ScanOptions options) {
return stringRedisTemplate.opsForHash().scan(key, options);
}

/** ------------------------list相关操作---------------------------- */

/**
* 通过索引获取列表中的元素
*
* @param key
* @param index
* @return
*/
public String lIndex(String key, long index) {
return stringRedisTemplate.opsForList().index(key, index);
}

/**
* 获取列表指定范围内的元素
*
* @param key
* @param start
* 开始位置, 0是开始位置
* @param end
* 结束位置, -1返回所有
* @return
*/
public List<String> lRange(String key, long start, long end) {
return stringRedisTemplate.opsForList().range(key, start, end);
}

/**
* 存储在list头部
*
* @param key
* @param value
* @return
*/
public Long lLeftPush(String key, String value) {
return stringRedisTemplate.opsForList().leftPush(key, value);
}

/**
*
* @param key
* @param value
* @return
*/
public Long lLeftPushAll(String key, String... value) {
return stringRedisTemplate.opsForList().leftPushAll(key, value);
}

/**
*
* @param key
* @param value
* @return
*/
public Long lLeftPushAll(String key, Collection<String> value) {
return stringRedisTemplate.opsForList().leftPushAll(key, value);
}

/**
* 当list存在的时候才加入
*
* @param key
* @param value
* @return
*/
public Long lLeftPushIfPresent(String key, String value) {
return stringRedisTemplate.opsForList().leftPushIfPresent(key, value);
}

/**
* 如果pivot存在,再pivot前面添加
*
* @param key
* @param pivot
* @param value
* @return
*/
public Long lLeftPush(String key, String pivot, String value) {
return stringRedisTemplate.opsForList().leftPush(key, pivot, value);
}

/**
*
* @param key
* @param value
* @return
*/
public Long lRightPush(String key, String value) {
return stringRedisTemplate.opsForList().rightPush(key, value);
}

/**
*
* @param key
* @param value
* @return
*/
public Long lRightPushAll(String key, String... value) {
return stringRedisTemplate.opsForList().rightPushAll(key, value);
}

/**
*
* @param key
* @param value
* @return
*/
public Long lRightPushAll(String key, Collection<String> value) {
return stringRedisTemplate.opsForList().rightPushAll(key, value);
}

/**
* 为已存在的列表添加值
*
* @param key
* @param value
* @return
*/
public Long lRightPushIfPresent(String key, String value) {
return stringRedisTemplate.opsForList().rightPushIfPresent(key, value);
}

/**
* 在pivot元素的右边添加值
*
* @param key
* @param pivot
* @param value
* @return
*/
public Long lRightPush(String key, String pivot, String value) {
return stringRedisTemplate.opsForList().rightPush(key, pivot, value);
}

/**
* 通过索引设置列表元素的值
*
* @param key
* @param index
* 位置
* @param value
*/
public void lSet(String key, long index, String value) {
stringRedisTemplate.opsForList().set(key, index, value);
}

/**
* 移出并获取列表的第一个元素
*
* @param key
* @return 删除的元素
*/
public String lLeftPop(String key) {
return stringRedisTemplate.opsForList().leftPop(key);
}

/**
* 移出并获取列表的第一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止
*
* @param key
* @param timeout
* 等待时间
* @param unit
* 时间单位
* @return
*/
public String lBLeftPop(String key, long timeout, TimeUnit unit) {
return stringRedisTemplate.opsForList().leftPop(key, timeout, unit);
}

/**
* 移除并获取列表最后一个元素
*
* @param key
* @return 删除的元素
*/
public String lRightPop(String key) {
return stringRedisTemplate.opsForList().rightPop(key);
}

/**
* 移出并获取列表的最后一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止
*
* @param key
* @param timeout
* 等待时间
* @param unit
* 时间单位
* @return
*/
public String lBRightPop(String key, long timeout, TimeUnit unit) {
return stringRedisTemplate.opsForList().rightPop(key, timeout, unit);
}

/**
* 移除列表的最后一个元素,并将该元素添加到另一个列表并返回
*
* @param sourceKey
* @param destinationKey
* @return
*/
public String lRightPopAndLeftPush(String sourceKey, String destinationKey) {
return stringRedisTemplate.opsForList().rightPopAndLeftPush(sourceKey,
destinationKey);
}

/**
* 从列表中弹出一个值,将弹出的元素插入到另外一个列表中并返回它; 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止
*
* @param sourceKey
* @param destinationKey
* @param timeout
* @param unit
* @return
*/
public String lBRightPopAndLeftPush(String sourceKey, String destinationKey,
long timeout, TimeUnit unit) {
return stringRedisTemplate.opsForList().rightPopAndLeftPush(sourceKey,
destinationKey, timeout, unit);
}

/**
* 删除集合中值等于value得元素
*
* @param key
* @param index
* index=0, 删除所有值等于value的元素; index>0, 从头部开始删除第一个值等于value的元素;
* index<0, 从尾部开始删除第一个值等于value的元素;
* @param value
* @return
*/
public Long lRemove(String key, long index, String value) {
return stringRedisTemplate.opsForList().remove(key, index, value);
}

/**
* 裁剪list
*
* @param key
* @param start
* @param end
*/
public void lTrim(String key, long start, long end) {
stringRedisTemplate.opsForList().trim(key, start, end);
}

/**
* 获取列表长度
*
* @param key
* @return
*/
public Long lLen(String key) {
return stringRedisTemplate.opsForList().size(key);
}


/** --------------------set相关操作-------------------------- */

/**
* set添加元素
*
* @param key
* @param values
* @return
*/
public Long sAdd(String key, String... values) {
return stringRedisTemplate.opsForSet().add(key, values);
}

/**
* set移除元素
*
* @param key
* @param values
* @return
*/
public Long sRemove(String key, Object... values) {
return stringRedisTemplate.opsForSet().remove(key, values);
}

/**
* 移除并返回集合的一个随机元素
*
* @param key
* @return
*/
public String sPop(String key) {
return stringRedisTemplate.opsForSet().pop(key);
}

/**
* 将元素value从一个集合移到另一个集合
*
* @param key
* @param value
* @param destKey
* @return
*/
public Boolean sMove(String key, String value, String destKey) {
return stringRedisTemplate.opsForSet().move(key, value, destKey);
}

/**
* 获取集合的大小
*
* @param key
* @return
*/
public Long sSize(String key) {
return stringRedisTemplate.opsForSet().size(key);
}

/**
* 判断集合是否包含value
*
* @param key
* @param value
* @return
*/
public Boolean sIsMember(String key, Object value) {
return stringRedisTemplate.opsForSet().isMember(key, value);
}

/**
* 获取两个集合的交集
*
* @param key
* @param otherKey
* @return
*/
public Set<String> sIntersect(String key, String otherKey) {
return stringRedisTemplate.opsForSet().intersect(key, otherKey);
}

/**
* 获取key集合与多个集合的交集
*
* @param key
* @param otherKeys
* @return
*/
public Set<String> sIntersect(String key, Collection<String> otherKeys) {
return stringRedisTemplate.opsForSet().intersect(key, otherKeys);
}

/**
* key集合与otherKey集合的交集存储到destKey集合中
*
* @param key
* @param otherKey
* @param destKey
* @return
*/
public Long sIntersectAndStore(String key, String otherKey, String destKey) {
return stringRedisTemplate.opsForSet().intersectAndStore(key, otherKey,
destKey);
}

/**
* key集合与多个集合的交集存储到destKey集合中
*
* @param key
* @param otherKeys
* @param destKey
* @return
*/
public Long sIntersectAndStore(String key, Collection<String> otherKeys,
String destKey) {
return stringRedisTemplate.opsForSet().intersectAndStore(key, otherKeys,
destKey);
}

/**
* 获取两个集合的并集
*
* @param key
* @param otherKeys
* @return
*/
public Set<String> sUnion(String key, String otherKeys) {
return stringRedisTemplate.opsForSet().union(key, otherKeys);
}

/**
* 获取key集合与多个集合的并集
*
* @param key
* @param otherKeys
* @return
*/
public Set<String> sUnion(String key, Collection<String> otherKeys) {
return stringRedisTemplate.opsForSet().union(key, otherKeys);
}

/**
* key集合与otherKey集合的并集存储到destKey中
*
* @param key
* @param otherKey
* @param destKey
* @return
*/
public Long sUnionAndStore(String key, String otherKey, String destKey) {
return stringRedisTemplate.opsForSet().unionAndStore(key, otherKey, destKey);
}

/**
* key集合与多个集合的并集存储到destKey中
*
* @param key
* @param otherKeys
* @param destKey
* @return
*/
public Long sUnionAndStore(String key, Collection<String> otherKeys,
String destKey) {
return stringRedisTemplate.opsForSet().unionAndStore(key, otherKeys, destKey);
}

/**
* 获取两个集合的差集
*
* @param key
* @param otherKey
* @return
*/
public Set<String> sDifference(String key, String otherKey) {
return stringRedisTemplate.opsForSet().difference(key, otherKey);
}

/**
* 获取key集合与多个集合的差集
*
* @param key
* @param otherKeys
* @return
*/
public Set<String> sDifference(String key, Collection<String> otherKeys) {
return stringRedisTemplate.opsForSet().difference(key, otherKeys);
}

/**
* key集合与otherKey集合的差集存储到destKey中
*
* @param key
* @param otherKey
* @param destKey
* @return
*/
public Long sDifference(String key, String otherKey, String destKey) {
return stringRedisTemplate.opsForSet().differenceAndStore(key, otherKey,
destKey);
}

/**
* key集合与多个集合的差集存储到destKey中
*
* @param key
* @param otherKeys
* @param destKey
* @return
*/
public Long sDifference(String key, Collection<String> otherKeys,
String destKey) {
return stringRedisTemplate.opsForSet().differenceAndStore(key, otherKeys,
destKey);
}

/**
* 获取集合所有元素
*
* @param key
* @param
* @param
* @return
*/
public Set<String> setMembers(String key) {
return stringRedisTemplate.opsForSet().members(key);
}

/**
* 随机获取集合中的一个元素
*
* @param key
* @return
*/
public String sRandomMember(String key) {
return stringRedisTemplate.opsForSet().randomMember(key);
}

/**
* 随机获取集合中count个元素
*
* @param key
* @param count
* @return
*/
public List<String> sRandomMembers(String key, long count) {
return stringRedisTemplate.opsForSet().randomMembers(key, count);
}

/**
* 随机获取集合中count个元素并且去除重复的
*
* @param key
* @param count
* @return
*/
public Set<String> sDistinctRandomMembers(String key, long count) {
return stringRedisTemplate.opsForSet().distinctRandomMembers(key, count);
}

/**
*
* @param key
* @param options
* @return
*/
public Cursor<String> sScan(String key, ScanOptions options) {
return stringRedisTemplate.opsForSet().scan(key, options);
}

/**------------------zSet相关操作--------------------------------*/

/**
* 添加元素,有序集合是按照元素的score值由小到大排列
*
* @param key
* @param value
* @param score
* @return
*/
public Boolean zAdd(String key, String value, double score) {
return stringRedisTemplate.opsForZSet().add(key, value, score);
}

/**
*
* @param key
* @param values
* @return
*/
public Long zAdd(String key, Set<TypedTuple<String>> values) {
return stringRedisTemplate.opsForZSet().add(key, values);
}

/**
*
* @param key
* @param values
* @return
*/
public Long zRemove(String key, Object... values) {
return stringRedisTemplate.opsForZSet().remove(key, values);
}

public Long zRemove(String key, Collection<String> values) {
if(values!=null&&!values.isEmpty()){
Object[] objs = values.toArray(new Object[values.size()]);
return stringRedisTemplate.opsForZSet().remove(key, objs);
}
return 0L;
}

/**
* 增加元素的score值,并返回增加后的值
*
* @param key
* @param value
* @param delta
* @return
*/
public Double zIncrementScore(String key, String value, double delta) {
return stringRedisTemplate.opsForZSet().incrementScore(key, value, delta);
}

/**
* 返回元素在集合的排名,有序集合是按照元素的score值由小到大排列
*
* @param key
* @param value
* @return 0表示第一位
*/
public Long zRank(String key, Object value) {
return stringRedisTemplate.opsForZSet().rank(key, value);
}

/**
* 返回元素在集合的排名,按元素的score值由大到小排列
*
* @param key
* @param value
* @return
*/
public Long zReverseRank(String key, Object value) {
return stringRedisTemplate.opsForZSet().reverseRank(key, value);
}

/**
* 获取集合的元素, 从小到大排序
*
* @param key
* @param start
* 开始位置
* @param end
* 结束位置, -1查询所有
* @return
*/
public Set<String> zRange(String key, long start, long end) {
return stringRedisTemplate.opsForZSet().range(key, start, end);
}

/**
* 获取zset集合的所有元素, 从小到大排序
*
*/
public Set<String> zRangeAll(String key) {
return zRange(key,0,-1);
}

/**
* 获取集合元素, 并且把score值也获取
*
* @param key
* @param start
* @param end
* @return
*/
public Set<TypedTuple<String>> zRangeWithScores(String key, long start,
long end) {
return stringRedisTemplate.opsForZSet().rangeWithScores(key, start, end);
}

/**
* 根据Score值查询集合元素
*
* @param key
* @param min
* 最小值
* @param max
* 最大值
* @return
*/
public Set<String> zRangeByScore(String key, double min, double max) {
return stringRedisTemplate.opsForZSet().rangeByScore(key, min, max);
}


/**
* 根据Score值查询集合元素, 从小到大排序
*
* @param key
* @param min
* 最小值
* @param max
* 最大值
* @return
*/
public Set<TypedTuple<String>> zRangeByScoreWithScores(String key,
double min, double max) {
return stringRedisTemplate.opsForZSet().rangeByScoreWithScores(key, min, max);
}

/**
*
* @param key
* @param min
* @param max
* @param start
* @param end
* @return
*/
public Set<TypedTuple<String>> zRangeByScoreWithScores(String key,
double min, double max, long start, long end) {
return stringRedisTemplate.opsForZSet().rangeByScoreWithScores(key, min, max,
start, end);
}

/**
* 获取集合的元素, 从大到小排序
*
* @param key
* @param start
* @param end
* @return
*/
public Set<String> zReverseRange(String key, long start, long end) {
return stringRedisTemplate.opsForZSet().reverseRange(key, start, end);

}

public Set<String> zReverseRangeByScore(String key, long min, long max) {
return stringRedisTemplate.opsForZSet().reverseRangeByScore(key, min, max);

}

/**
* 获取集合的元素, 从大到小排序, 并返回score值
*
* @param key
* @param start
* @param end
* @return
*/
public Set<TypedTuple<String>> zReverseRangeWithScores(String key,
long start, long end) {
return stringRedisTemplate.opsForZSet().reverseRangeWithScores(key, start,
end);
}

/**
* 根据Score值查询集合元素, 从大到小排序
*
* @param key
* @param min
* @param max
* @return
*/
public Set<String> zReverseRangeByScore(String key, double min,
double max) {
return stringRedisTemplate.opsForZSet().reverseRangeByScore(key, min, max);
}

/**
* 根据Score值查询集合元素, 从大到小排序
*
* @param key
* @param min
* @param max
* @return
*/
public Set<TypedTuple<String>> zReverseRangeByScoreWithScores(
String key, double min, double max) {
return stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(key,
min, max);
}

/**
*
* @param key
* @param min
* @param max
* @param start
* @param end
* @return
*/
public Set<String> zReverseRangeByScore(String key, double min,
double max, long start, long end) {
return stringRedisTemplate.opsForZSet().reverseRangeByScore(key, min, max,
start, end);
}

/**
* 根据score值获取集合元素数量
*
* @param key
* @param min
* @param max
* @return
*/
public Long zCount(String key, double min, double max) {
return stringRedisTemplate.opsForZSet().count(key, min, max);
}

/**
* 获取集合大小
*
* @param key
* @return
*/
public Long zSize(String key) {
return stringRedisTemplate.opsForZSet().size(key);
}

/**
* 获取集合大小
*
* @param key
* @return
*/
public Long zZCard(String key) {
return stringRedisTemplate.opsForZSet().zCard(key);
}

/**
* 获取集合中value元素的score值
*
* @param key
* @param value
* @return
*/
public Double zScore(String key, Object value) {
return stringRedisTemplate.opsForZSet().score(key, value);
}

/**
* 移除指定索引位置的成员
*
* @param key
* @param start
* @param end
* @return
*/
public Long zRemoveRange(String key, long start, long end) {
return stringRedisTemplate.opsForZSet().removeRange(key, start, end);
}

/**
* 根据指定的score值的范围来移除成员
*
* @param key
* @param min
* @param max
* @return
*/
public Long zRemoveRangeByScore(String key, double min, double max) {
return stringRedisTemplate.opsForZSet().removeRangeByScore(key, min, max);
}

/**
* 获取key和otherKey的并集并存储在destKey中
*
* @param key
* @param otherKey
* @param destKey
* @return
*/
public Long zUnionAndStore(String key, String otherKey, String destKey) {
return stringRedisTemplate.opsForZSet().unionAndStore(key, otherKey, destKey);
}

/**
*
* @param key
* @param otherKeys
* @param destKey
* @return
*/
public Long zUnionAndStore(String key, Collection<String> otherKeys,
String destKey) {
return stringRedisTemplate.opsForZSet()
.unionAndStore(key, otherKeys, destKey);
}

/**
* 交集
*
* @param key
* @param otherKey
* @param destKey
* @return
*/
public Long zIntersectAndStore(String key, String otherKey,
String destKey) {
return stringRedisTemplate.opsForZSet().intersectAndStore(key, otherKey,
destKey);
}

/**
* 交集
*
* @param key
* @param otherKeys
* @param destKey
* @return
*/
public Long zIntersectAndStore(String key, Collection<String> otherKeys,
String destKey) {
return stringRedisTemplate.opsForZSet().intersectAndStore(key, otherKeys,
destKey);
}

/**
*
* @param key
* @param options
* @return
*/
public Cursor<TypedTuple<String>> zScan(String key, ScanOptions options) {
return stringRedisTemplate.opsForZSet().scan(key, options);
}

/**
* 扫描主键,建议使用
* @param patten
* @return
*/
public Set<String> scan(String patten){
Set<String> keys = stringRedisTemplate.execute((RedisCallback<Set<String>>) connection -> {
Set<String> result = new HashSet<>();
try (Cursor<byte[]> cursor = connection.scan(new ScanOptions.ScanOptionsBuilder()
.match(patten).count(10000).build())) {
while (cursor.hasNext()) {
result.add(new String(cursor.next()));
}
} catch (IOException e) {
e.printStackTrace();
}
return result;
});
return keys;
}

/**
* 管道技术,提高性能
* @param type
* @param values
* @return
*/
public List<Object> lRightPushPipeline(String type,Collection<String> values){
List<Object> results = stringRedisTemplate.executePipelined(new RedisCallback<Object>() {
public Object doInRedis(RedisConnection connection) throws DataAccessException {
StringRedisConnection stringRedisConn = (StringRedisConnection)connection;
//集合转换数组
String[] strings = values.toArray(new String[values.size()]);
//直接批量发送
stringRedisConn.rPush(type, strings);
return null;
}
});
return results;
}

public List<Object> refreshWithPipeline(String future_key,String topic_key,Collection<String> values){

List<Object> objects = stringRedisTemplate.executePipelined(new RedisCallback<Object>() {
@Nullable
@Override
public Object doInRedis(RedisConnection redisConnection) throws DataAccessException {
StringRedisConnection stringRedisConnection = (StringRedisConnection)redisConnection;
String[] strings = values.toArray(new String[values.size()]);
stringRedisConnection.rPush(topic_key,strings);
stringRedisConnection.zRem(future_key,strings);
return null;
}
});
return objects;
}

}

④:测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.heima.schedule.test;


import com.heima.common.redis.CacheService;
import com.heima.schedule.ScheduleApplication;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.Set;


@SpringBootTest(classes = ScheduleApplication.class)
@RunWith(SpringRunner.class)
public class RedisTest {

@Autowired
private CacheService cacheService;

@Test
public void testList(){

//在list的左边添加元素
// cacheService.lLeftPush("list_001","hello,redis");

//在list的右边获取元素,并删除
String list_001 = cacheService.lRightPop("list_001");
System.out.println(list_001);
}

@Test
public void testZset(){
//添加数据到zset中 分值
/*cacheService.zAdd("zset_key_001","hello zset 001",1000);
cacheService.zAdd("zset_key_001","hello zset 002",8888);
cacheService.zAdd("zset_key_001","hello zset 003",7777);
cacheService.zAdd("zset_key_001","hello zset 004",999999);*/

//按照分值获取数据
Set<String> zset_key_001 = cacheService.zRangeByScore("zset_key_001", 0, 8888);
System.out.println(zset_key_001);


}
}

4.5)添加任务

①:拷贝mybatis-plus生成的文件,mapper

②:创建task类,用于接收添加任务的参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.heima.model.schedule.dtos;

import lombok.Data;

import java.io.Serializable;

@Data
public class Task implements Serializable {

/**
* 任务id
*/
private Long taskId;
/**
* 类型
*/
private Integer taskType;

/**
* 优先级
*/
private Integer priority;

/**
* 执行id
*/
private long executeTime;

/**
* task参数
*/
private byte[] parameters;

}

③:创建TaskService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.heima.schedule.service;

import com.heima.model.schedule.dtos.Task;

/**
* 对外访问接口
*/
public interface TaskService {

/**
* 添加任务
* @param task 任务对象
* @return 任务id
*/
public long addTask(Task task) ;

}

实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package com.heima.schedule.service.impl;

import com.alibaba.fastjson.JSON;
import com.heima.common.constants.ScheduleConstants;
import com.heima.common.redis.CacheService;
import com.heima.model.schedule.dtos.Task;
import com.heima.model.schedule.pojos.Taskinfo;
import com.heima.model.schedule.pojos.TaskinfoLogs;
import com.heima.schedule.mapper.TaskinfoLogsMapper;
import com.heima.schedule.mapper.TaskinfoMapper;
import com.heima.schedule.service.TaskService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.Calendar;
import java.util.Date;

@Service
@Transactional
@Slf4j
public class TaskServiceImpl implements TaskService {
/**
* 添加延迟任务
*
* @param task
* @return
*/
@Override
public long addTask(Task task) {
//1.添加任务到数据库中

boolean success = addTaskToDb(task);

if (success) {
//2.添加任务到redis
addTaskToCache(task);
}


return task.getTaskId();
}

@Autowired
private CacheService cacheService;

/**
* 把任务添加到redis中
*
* @param task
*/
private void addTaskToCache(Task task) {

String key = task.getTaskType() + "_" + task.getPriority();

//获取5分钟之后的时间 毫秒值
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.MINUTE, 5);
long nextScheduleTime = calendar.getTimeInMillis();

//2.1 如果任务的执行时间小于等于当前时间,存入list
if (task.getExecuteTime() <= System.currentTimeMillis()) {
cacheService.lLeftPush(ScheduleConstants.TOPIC + key, JSON.toJSONString(task));
} else if (task.getExecuteTime() <= nextScheduleTime) {
//2.2 如果任务的执行时间大于当前时间 && 小于等于预设时间(未来5分钟) 存入zset中
cacheService.zAdd(ScheduleConstants.FUTURE + key, JSON.toJSONString(task), task.getExecuteTime());
}


}

@Autowired
private TaskinfoMapper taskinfoMapper;

@Autowired
private TaskinfoLogsMapper taskinfoLogsMapper;

/**
* 添加任务到数据库中
*
* @param task
* @return
*/
private boolean addTaskToDb(Task task) {

boolean flag = false;

try {
//保存任务表
Taskinfo taskinfo = new Taskinfo();
BeanUtils.copyProperties(task, taskinfo);
taskinfo.setExecuteTime(new Date(task.getExecuteTime()));
taskinfoMapper.insert(taskinfo);

//设置taskID
task.setTaskId(taskinfo.getTaskId());

//保存任务日志数据
TaskinfoLogs taskinfoLogs = new TaskinfoLogs();
BeanUtils.copyProperties(taskinfo, taskinfoLogs);
taskinfoLogs.setVersion(1);
taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED);
taskinfoLogsMapper.insert(taskinfoLogs);

flag = true;
} catch (Exception e) {
e.printStackTrace();
}

return flag;
}
}

ScheduleConstants常量类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package com.heima.common.constants;

public class ScheduleConstants {

//task状态
public static final int SCHEDULED=0; //初始化状态

public static final int EXECUTED=1; //已执行状态

public static final int CANCELLED=2; //已取消状态

public static String FUTURE="future_"; //未来数据key前缀

public static String TOPIC="topic_"; //当前数据key前缀
}

④:测试

4.6)取消任务

在TaskService中添加方法

1
2
3
4
5
6
7
/**
* 取消任务
* @param taskId 任务id
* @return 取消结果
*/
public boolean cancelTask(long taskId);

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
/**
* 取消任务
* @param taskId
* @return
*/
@Override
public boolean cancelTask(long taskId) {

boolean flag = false;

//删除任务,更新日志
Task task = updateDb(taskId,ScheduleConstants.EXECUTED);

//删除redis的数据
if(task != null){
removeTaskFromCache(task);
flag = true;
}



return false;
}

/**
* 删除redis中的任务数据
* @param task
*/
private void removeTaskFromCache(Task task) {

String key = task.getTaskType()+"_"+task.getPriority();

if(task.getExecuteTime()<=System.currentTimeMillis()){
cacheService.lRemove(ScheduleConstants.TOPIC+key,0,JSON.toJSONString(task));
}else {
cacheService.zRemove(ScheduleConstants.FUTURE+key, JSON.toJSONString(task));
}
}

/**
* 删除任务,更新任务日志状态
* @param taskId
* @param status
* @return
*/
private Task updateDb(long taskId, int status) {
Task task = null;
try {
//删除任务
taskinfoMapper.deleteById(taskId);

TaskinfoLogs taskinfoLogs = taskinfoLogsMapper.selectById(taskId);
taskinfoLogs.setStatus(status);
taskinfoLogsMapper.updateById(taskinfoLogs);

task = new Task();
BeanUtils.copyProperties(taskinfoLogs,task);
task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime());
}catch (Exception e){
log.error("task cancel exception taskid={}",taskId);
}

return task;

}

测试

4.7)消费任务

在TaskService中添加方法

1
2
3
4
5
6
7
/**
* 按照类型和优先级来拉取任务
* @param type
* @param priority
* @return
*/
public Task poll(int type,int priority);

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* 按照类型和优先级拉取任务
* @return
*/
@Override
public Task poll(int type,int priority) {
Task task = null;
try {
String key = type+"_"+priority;
String task_json = cacheService.lRightPop(ScheduleConstants.TOPIC + key);
if(StringUtils.isNotBlank(task_json)){
task = JSON.parseObject(task_json, Task.class);
//更新数据库信息
updateDb(task.getTaskId(),ScheduleConstants.EXECUTED);
}
}catch (Exception e){
e.printStackTrace();
log.error("poll task exception");
}

return task;
}

4.8)未来数据定时刷新

4.8.1)reids key值匹配

方案1:keys 模糊匹配

keys的模糊匹配功能很方便也很强大,但是在生产环境需要慎用!开发中使用keys的模糊匹配却发现redis的CPU使用率极高,所以公司的redis生产环境将keys命令禁用了!redis是单线程,会被堵塞

pk7pdhQ.png

方案2:scan

SCAN 命令是一个基于游标的迭代器,SCAN命令每次被调用之后, 都会向用户返回一个新的游标, 用户在下次迭代时需要使用这个新游标作为SCAN命令的游标参数, 以此来延续之前的迭代过程。

pk7p0pj.png

代码案例:

1
2
3
4
5
6
7
8
@Test
public void testKeys(){
Set<String> keys = cacheService.keys("future_*");
System.out.println(keys);

Set<String> scan = cacheService.scan("future_*");
System.out.println(scan);
}
4.8.2)reids管道

普通redis客户端和服务器交互模式

pk7pB1s.png

Pipeline请求模型

pk7pyn0.png

官方测试结果数据对比

pk7pRNF.png

测试案例对比:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//耗时6151
@Test
public void testPiple1(){
long start =System.currentTimeMillis();
for (int i = 0; i <10000 ; i++) {
Task task = new Task();
task.setTaskType(1001);
task.setPriority(1);
task.setExecuteTime(new Date().getTime());
cacheService.lLeftPush("1001_1", JSON.toJSONString(task));
}
System.out.println("耗时"+(System.currentTimeMillis()- start));
}


@Test
public void testPiple2(){
long start = System.currentTimeMillis();
//使用管道技术
List<Object> objectList = cacheService.getstringRedisTemplate().executePipelined(new RedisCallback<Object>() {
@Nullable
@Override
public Object doInRedis(RedisConnection redisConnection) throws DataAccessException {
for (int i = 0; i <10000 ; i++) {
Task task = new Task();
task.setTaskType(1001);
task.setPriority(1);
task.setExecuteTime(new Date().getTime());
redisConnection.lPush("1001_1".getBytes(), JSON.toJSONString(task).getBytes());
}
return null;
}
});
System.out.println("使用管道技术执行10000次自增操作共耗时:"+(System.currentTimeMillis()-start)+"毫秒");
}
4.8.3)未来数据定时刷新-功能完成

在TaskService中添加方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Scheduled(cron = "0 */1 * * * ?")
public void refresh() {
System.out.println(System.currentTimeMillis() / 1000 + "执行了定时任务");

// 获取所有未来数据集合的key值
Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_*
for (String futureKey : futureKeys) { // future_250_250

String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1];
//获取该组key下当前需要消费的任务数据
Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());
if (!tasks.isEmpty()) {
//将这些任务数据添加到消费者队列中
cacheService.refreshWithPipeline(futureKey, topicKey, tasks);
System.out.println("成功的将" + futureKey + "下的当前需要执行的任务数据刷新到" + topicKey + "下");
}
}
}

在引导类中添加开启任务调度注解:@EnableScheduling

4.9)分布式锁解决集群下的方法抢占执行

4.9.1)问题描述

启动两台eadnews-schedule服务,每台服务都会去执行refresh定时任务方法

4.9.2)分布式锁

分布式锁:控制分布式系统有序的去对共享资源进行操作,通过互斥来保证数据的一致性。

解决方案:

pk7p5cR.png

4.9.3)redis分布式锁

sexnx (SET if Not eXists) 命令在指定的 key 不存在时,为 key 设置指定的值。

pk7pIj1.png

这种加锁的思路是,如果 key 不存在则为 key 设置 value,如果 key 已存在则 SETNX 命令不做任何操作

  • 客户端A请求服务器设置key的值,如果设置成功就表示加锁成功
  • 客户端B也去请求服务器设置key的值,如果返回失败,那么就代表加锁失败
  • 客户端A执行代码完成,删除锁
  • 客户端B在等待一段时间后再去请求设置key的值,设置成功
  • 客户端B执行代码完成,删除锁
4.9.4)在工具类CacheService中添加方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* 加锁
*
* @param name
* @param expire
* @return
*/
public String tryLock(String name, long expire) {
name = name + "_lock";
String token = UUID.randomUUID().toString();
RedisConnectionFactory factory = stringRedisTemplate.getConnectionFactory();
RedisConnection conn = factory.getConnection();
try {

//参考redis命令:
//set key value [EX seconds] [PX milliseconds] [NX|XX]
Boolean result = conn.set(
name.getBytes(),
token.getBytes(),
Expiration.from(expire, TimeUnit.MILLISECONDS),
RedisStringCommands.SetOption.SET_IF_ABSENT //NX
);
if (result != null && result)
return token;
} finally {
RedisConnectionUtils.releaseConnection(conn, factory,false);
}
return null;
}

修改未来数据定时刷新的方法,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* 未来数据定时刷新
*/
@Scheduled(cron = "0 */1 * * * ?")
public void refresh(){

String token = cacheService.tryLock("FUTURE_TASK_SYNC", 1000 * 30);
if(StringUtils.isNotBlank(token)){
log.info("未来数据定时刷新---定时任务");

//获取所有未来数据的集合key
Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");
for (String futureKey : futureKeys) {//future_100_50

//获取当前数据的key topic
String topicKey = ScheduleConstants.TOPIC+futureKey.split(ScheduleConstants.FUTURE)[1];

//按照key和分值查询符合条件的数据
Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());

//同步数据
if(!tasks.isEmpty()){
cacheService.refreshWithPipeline(futureKey,topicKey,tasks);
log.info("成功的将"+futureKey+"刷新到了"+topicKey);
}
}
}
}

4.10)数据库同步到redis

pk7pTnx.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Scheduled(cron = "0 */5 * * * ?")
@PostConstruct
public void reloadData() {
clearCache();
log.info("数据库数据同步到缓存");
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.MINUTE, 5);

//查看小于未来5分钟的所有任务
List<Taskinfo> allTasks = taskinfoMapper.selectList(Wrappers.<Taskinfo>lambdaQuery().lt(Taskinfo::getExecuteTime,calendar.getTime()));
if(allTasks != null && allTasks.size() > 0){
for (Taskinfo taskinfo : allTasks) {
Task task = new Task();
BeanUtils.copyProperties(taskinfo,task);
task.setExecuteTime(taskinfo.getExecuteTime().getTime());
addTaskToCache(task);
}
}
}

private void clearCache(){
// 删除缓存中未来数据集合和当前消费者队列的所有key
Set<String> futurekeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_
Set<String> topickeys = cacheService.scan(ScheduleConstants.TOPIC + "*");// topic_
cacheService.delete(futurekeys);
cacheService.delete(topickeys);
}

5)延迟队列解决精准时间发布文章

5.1)延迟队列服务提供对外接口

提供远程的feign接口,在heima-leadnews-feign-api编写类如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.heima.apis.schedule;

import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.schedule.dtos.Task;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;

@FeignClient("leadnews-schedule")
public interface IScheduleClient {

/**
* 添加任务
* @param task 任务对象
* @return 任务id
*/
@PostMapping("/api/v1/task/add")
public ResponseResult addTask(@RequestBody Task task);

/**
* 取消任务
* @param taskId 任务id
* @return 取消结果
*/
@GetMapping("/api/v1/task/cancel/{taskId}")
public ResponseResult cancelTask(@PathVariable("taskId") long taskId);

/**
* 按照类型和优先级来拉取任务
* @param type
* @param priority
* @return
*/
@GetMapping("/api/v1/task/poll/{type}/{priority}")
public ResponseResult poll(@PathVariable("type") int type,@PathVariable("priority") int priority);
}

在heima-leadnews-schedule微服务下提供对应的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package com.heima.schedule.feign;

import com.heima.apis.schedule.IScheduleClient;
import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.schedule.dtos.Task;
import com.heima.schedule.service.TaskService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;


@RestController
public class ScheduleClient implements IScheduleClient {

@Autowired
private TaskService taskService;

/**
* 添加任务
* @param task 任务对象
* @return 任务id
*/
@PostMapping("/api/v1/task/add")
@Override
public ResponseResult addTask(@RequestBody Task task) {
return ResponseResult.okResult(taskService.addTask(task));
}

/**
* 取消任务
* @param taskId 任务id
* @return 取消结果
*/
@GetMapping("/api/v1/task/cancel/{taskId}")
@Override
public ResponseResult cancelTask(@PathVariable("taskId") long taskId) {
return ResponseResult.okResult(taskService.cancelTask(taskId));
}

/**
* 按照类型和优先级来拉取任务
* @param type
* @param priority
* @return
*/
@GetMapping("/api/v1/task/poll/{type}/{priority}")
@Override
public ResponseResult poll(@PathVariable("type") int type, @PathVariable("priority") int priority) {
return ResponseResult.okResult(taskService.poll(type,priority));
}
}
5.2)发布文章集成添加延迟队列接口

在创建WmNewsTaskService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package com.heima.wemedia.service;

import com.heima.model.wemedia.pojos.WmNews;


public interface WmNewsTaskService {

/**
* 添加任务到延迟队列中
* @param id 文章的id
* @param publishTime 发布的时间 可以做为任务的执行时间
*/
public void addNewsToTask(Integer id, Date publishTime);


}

实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.heima.wemedia.service.impl;

import com.heima.apis.schedule.IScheduleClient;
import com.heima.model.common.enums.TaskTypeEnum;
import com.heima.model.schedule.dtos.Task;
import com.heima.model.wemedia.pojos.WmNews;
import com.heima.utils.common.ProtostuffUtil;
import com.heima.wemedia.service.WmNewsTaskService;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;


@Service
@Slf4j
public class WmNewsTaskServiceImpl implements WmNewsTaskService {


@Autowired
private IScheduleClient scheduleClient;

/**
* 添加任务到延迟队列中
* @param id 文章的id
* @param publishTime 发布的时间 可以做为任务的执行时间
*/
@Override
@Async
public void addNewsToTask(Integer id, Date publishTime) {

log.info("添加任务到延迟服务中----begin");

Task task = new Task();
task.setExecuteTime(publishTime.getTime());
task.setTaskType(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType());
task.setPriority(TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
WmNews wmNews = new WmNews();
wmNews.setId(id);
task.setParameters(ProtostuffUtil.serialize(wmNews));

scheduleClient.addTask(task);

log.info("添加任务到延迟服务中----end");

}

}

枚举类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package com.heima.model.common.enums;

import lombok.AllArgsConstructor;
import lombok.Getter;

@Getter
@AllArgsConstructor
public enum TaskTypeEnum {

NEWS_SCAN_TIME(1001, 1,"文章定时审核"),
REMOTEERROR(1002, 2,"第三方接口调用失败,重试");
private final int taskType; //对应具体业务
private final int priority; //业务不同级别
private final String desc; //描述信息
}

序列化工具对比

  • JdkSerialize:java内置的序列化能将实现了Serilazable接口的对象进行序列化和反序列化, ObjectOutputStream的writeObject()方法可序列化对象生成字节数组
  • Protostuff:google开源的protostuff采用更为紧凑的二进制数组,表现更加优异,然后使用protostuff的编译工具生成pojo类

工具类:

JdkSerializeUtil

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;

/**
* jdk序列化
*/
public class JdkSerializeUtil {

/**
* 序列化
* @param obj
* @param <T>
* @return
*/
public static <T> byte[] serialize(T obj) {

if (obj == null){
throw new NullPointerException();
}

ByteArrayOutputStream bos = new ByteArrayOutputStream();
try {
ObjectOutputStream oos = new ObjectOutputStream(bos);

oos.writeObject(obj);
return bos.toByteArray();
} catch (Exception ex) {
ex.printStackTrace();
}
return new byte[0];
}

/**
* 反序列化
* @param data
* @param clazz
* @param <T>
* @return
*/
public static <T> T deserialize(byte[] data, Class<T> clazz) {
ByteArrayInputStream bis = new ByteArrayInputStream(data);

try {
ObjectInputStream ois = new ObjectInputStream(bis);
T obj = (T)ois.readObject();
return obj;
} catch (Exception ex) {
ex.printStackTrace();
}

return null;
}



}

ProtostuffUtil:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public class ProtostuffUtil {

/**
* 序列化
* @param t
* @param <T>
* @return
*/
public static <T> byte[] serialize(T t){
Schema schema = RuntimeSchema.getSchema(t.getClass());
return ProtostuffIOUtil.toByteArray(t,schema,
LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE));

}

/**
* 反序列化
* @param bytes
* @param c
* @param <T>
* @return
*/
public static <T> T deserialize(byte []bytes,Class<T> c) {
T t = null;
try {
t = c.newInstance();
Schema schema = RuntimeSchema.getSchema(t.getClass());
ProtostuffIOUtil.mergeFrom(bytes,t,schema);
} catch (InstantiationException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
}
return t;
}

/**
* jdk序列化与protostuff序列化对比
* @param args
*/
public static void main(String[] args) {
long start =System.currentTimeMillis();
for (int i = 0; i <1000000 ; i++) {
WmNews wmNews =new WmNews();
JdkSerializeUtil.serialize(wmNews);
}
System.out.println(" jdk 花费 "+(System.currentTimeMillis()-start));

start =System.currentTimeMillis();
for (int i = 0; i <1000000 ; i++) {
WmNews wmNews =new WmNews();
ProtostuffUtil.serialize(wmNews);
}
System.out.println(" protostuff 花费 "+(System.currentTimeMillis()-start));
}



}

Protostuff需要引导依赖:

1
2
3
4
5
6
7
8
9
10
11
<dependency>
<groupId>io.protostuff</groupId>
<artifactId>protostuff-core</artifactId>
<version>1.6.0</version>
</dependency>

<dependency>
<groupId>io.protostuff</groupId>
<artifactId>protostuff-runtime</artifactId>
<version>1.6.0</version>
</dependency>

修改发布文章代码:

把之前的异步调用修改为调用延迟任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@Autowired
private WmNewsTaskService wmNewsTaskService;

/**
* 发布修改文章或保存为草稿
* @param dto
* @return
*/
@Override
public ResponseResult submitNews(WmNewsDto dto) {

//0.条件判断
if(dto == null || dto.getContent() == null){
return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
}

//1.保存或修改文章

WmNews wmNews = new WmNews();
//属性拷贝 属性名词和类型相同才能拷贝
BeanUtils.copyProperties(dto,wmNews);
//封面图片 list---> string
if(dto.getImages() != null && dto.getImages().size() > 0){
//[1dddfsd.jpg,sdlfjldk.jpg]--> 1dddfsd.jpg,sdlfjldk.jpg
String imageStr = StringUtils.join(dto.getImages(), ",");
wmNews.setImages(imageStr);
}
//如果当前封面类型为自动 -1
if(dto.getType().equals(WemediaConstants.WM_NEWS_TYPE_AUTO)){
wmNews.setType(null);
}

saveOrUpdateWmNews(wmNews);

//2.判断是否为草稿 如果为草稿结束当前方法
if(dto.getStatus().equals(WmNews.Status.NORMAL.getCode())){
return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}

//3.不是草稿,保存文章内容图片与素材的关系
//获取到文章内容中的图片信息
List<String> materials = ectractUrlInfo(dto.getContent());
saveRelativeInfoForContent(materials,wmNews.getId());

//4.不是草稿,保存文章封面图片与素材的关系,如果当前布局是自动,需要匹配封面图片
saveRelativeInfoForCover(dto,wmNews,materials);

//审核文章
// wmNewsAutoScanService.autoScanWmNews(wmNews.getId());
wmNewsTaskService.addNewsToTask(wmNews.getId(),wmNews.getPublishTime());

return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);

}
5.3)消费任务进行审核文章

WmNewsTaskService中添加方法

1
2
3
4
/**
* 消费延迟队列数据
*/
public void scanNewsByTask();

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Autowired
private WmNewsAutoScanServiceImpl wmNewsAutoScanService;

/**
* 消费延迟队列数据
*/
@Scheduled(fixedRate = 1000)
@Override
@SneakyThrows
public void scanNewsByTask() {

log.info("文章审核---消费任务执行---begin---");

ResponseResult responseResult = scheduleClient.poll(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType(), TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
if(responseResult.getCode().equals(200) && responseResult.getData() != null){
String json_str = JSON.toJSONString(responseResult.getData());
Task task = JSON.parseObject(json_str, Task.class);
byte[] parameters = task.getParameters();
WmNews wmNews = ProtostuffUtil.deserialize(parameters, WmNews.class);
System.out.println(wmNews.getId()+"-----------");
wmNewsAutoScanService.autoScanWmNews(wmNews.getId());
}
log.info("文章审核---消费任务执行---end---");
}

在WemediaApplication自媒体的引导类中添加开启任务调度注解@EnableScheduling

文章结束!