业务流程
用户提交任务。首先将任务推送至延迟队列中。
延迟队列接收到任务后,首先将任务推送至job pool中,然后计算其执行时间。
然后生成延迟任务(仅仅包含任务id)放入某个桶中
时间组件时刻轮询各个桶,当时间到达的时候从job pool中获得任务元信息。
监测任务的合法性如果已经删除则pass。继续轮询。如果任务合法则再次计算时间
如果合法则计算时间,如果时间合法:根据topic将任务放入对应的ready queue,然后从bucket中移除。如果时间不合法,则重新计算时间再次放入bucket,并移除之前的bucket中的内容
消费端轮询对应topic的ready queue。获取job后做自己的业务逻辑。与此同时,服务端将已经被消费端获取的job按照其设定的TTR,重新计算执行时间,并将其放入bucket。
完成消费后,发送finish消息,服务端根据job id删除对应信息。
对象
延迟队列,为Redis延迟队列。实现消息传递
Job pool 任务池保存job元信息。根据文章描述使用K/V的数据结构,key为ID,value为job
Delay Bucket 用来保存业务的延迟任务。文章中描述使用轮询方式放入某一个Bucket可以知道其并没有使用topic来区分,个人这里默认使用顺序插入
Timer 时间组件,负责扫描各个Bucket。根据文章描述存在多个Timer,但是同一个Timer同一时间只能扫描一个Bucket
Ready Queue 负责存放需要被完成的任务,但是根据描述根据Topic的不同存在多个Ready Queue。
任务状态
ready:可执行状态,
delay:不可执行状态,等待时钟周期。
reserved:已被消费者读取,但没有完成消费。
deleted:已被消费完成或者已被删除。
对外提供的接口
接口 | 描述 | 数据 |
---|---|---|
add | 添加任务 | Job数据 |
pop | 取出待处理任务 | topic就是任务分组 |
finish | 完成任务 | 任务ID |
delete | 删除任务 | 任务ID |
额外的内容
首先根据状态状态描述,finish和delete操作都是将任务设置成deleted状态。
根据文章描述的操作,在执行finish或者delete的操作的时候任务已经从元数据中移除,此时deleted状态可能只存在极短时间,所以实际实现中就直接删除了。
文章中并没有说明响应超时后如何处理,所以个人现在将其重新投入了待处理队列。
文章中因为使用了集群,所以使用redis的setnx锁来保证多个时间循环处理多个桶的时候不会出现重复循环。这里因为是简单的实现,所以就很简单的每个桶设置一个时间队列处理。也是为了方便简单处理。关于分布式锁可以看我之前的文章里面有描述。
实现
任务及相关对象
@Data @AllArgsConstructor @NoArgsConstructor public class Job implements Serializable { /** * 延迟任务的唯一标识,用于检索任务 */ @JsonSerialize(using = ToStringSerializer.class) private Long id; /** * 任务类型(具体业务类型) */ private String topic; /** * 任务的延迟时间 */ private long delayTime; /** * 任务的执行超时时间 */ private long ttrTime; /** * 任务具体的消息内容,用于处理具体业务逻辑用 */ private String message; /** * 重试次数 */ private int retryCount; /** * 任务状态 */ private JobStatus status; }
@Data @AllArgsConstructor public class DelayJob implements Serializable { /** * 延迟任务的唯一标识 */ private long jodId; /** * 任务的执行时间 */ private long delayDate; /** * 任务类型(具体业务类型) */ private String topic; public DelayJob(Job job) { this.jodId = job.getId(); this.delayDate = System.currentTimeMillis() + job.getDelayTime(); this.topic = job.getTopic(); } public DelayJob(Object value, Double score) { this.jodId = Long.parseLong(String.valueOf(value)); this.delayDate = System.currentTimeMillis() + score.longValue(); } }
容器
@Component @Slf4j public class JobPool { @Autowired private RedisTemplate redisTemplate; private String NAME = "job.pool"; private BoundHashOperations getPool () { BoundHashOperations ops = redisTemplate.boundHashOps(NAME); return ops; } /** * 添加任务 * @param job */ public void addJob (Job job) { log.info("任务池添加任务:{}", JSON.toJSONString(job)); getPool().put(job.getId(),job); return ; } /** * 获得任务 * @param jobId * @return */ public Job getJob(Long jobId) { Object o = getPool().get(jobId); if (o instanceof Job) { return (Job) o; } return null; } /** * 移除任务 * @param jobId */ public void removeDelayJob (Long jobId) { log.info("任务池移除任务:{}",jobId); // 移除任务 getPool().delete(jobId); } }
@Slf4j @Component public class DelayBucket { @Autowired private RedisTemplate redisTemplate; private static AtomicInteger index = new AtomicInteger(0); @Value("${thread.size}") private int bucketsSize; private List <String> bucketNames = new ArrayList <>(); @Bean public List <String> createBuckets() { for (int i = 0; i < bucketsSize; i++) { bucketNames.add("bucket" + i); } return bucketNames; } /** * 获得桶的名称 * @return */ private String getThisBucketName() { int thisIndex = index.addAndGet(1); int i1 = thisIndex % bucketsSize; return bucketNames.get(i1); } /** * 获得桶集合 * @param bucketName * @return */ private BoundZSetOperations getBucket(String bucketName) { return redisTemplate.boundZSetOps(bucketName); } /** * 放入延时任务 * @param job */ public void addDelayJob(DelayJob job) { log.info("添加延迟任务:{}", JSON.toJSONString(job)); String thisBucketName = getThisBucketName(); BoundZSetOperations bucket = getBucket(thisBucketName); bucket.add(job,job.getDelayDate()); } /** * 获得最新的延期任务 * @return */ public DelayJob getFirstDelayTime(Integer index) { String name = bucketNames.get(index); BoundZSetOperations bucket = getBucket(name); Set<ZSetOperations.TypedTuple> set = bucket.rangeWithScores(0, 1); if (CollectionUtils.isEmpty(set)) { return null; } ZSetOperations.TypedTuple typedTuple = (ZSetOperations.TypedTuple) set.toArray()[0]; Object value = typedTuple.getValue(); if (value instanceof DelayJob) { return (DelayJob) value; } return null; } /** * 移除延时任务 * @param index * @param delayJob */ public void removeDelayTime(Integer index,DelayJob delayJob) { String name = bucketNames.get(index); BoundZSetOperations bucket = getBucket(name); bucket.remove(delayJob); } }
@Component @Slf4j public class ReadyQueue { @Autowired private RedisTemplate redisTemplate; private String NAME = "process.queue"; private String getKey(String topic) { return NAME + topic; } /** * 获得队列 * @param topic * @return */ private BoundListOperations getQueue (String topic) { BoundListOperations ops = redisTemplate.boundListOps(getKey(topic)); return ops; } /** * 设置任务 * @param delayJob */ public void pushJob(DelayJob delayJob) { log.info("执行队列添加任务:{}",delayJob); BoundListOperations listOperations = getQueue(delayJob.getTopic()); listOperations.leftPush(delayJob); } /** * 移除并获得任务 * @param topic * @return */ public DelayJob popJob(String topic) { BoundListOperations listOperations = getQueue(topic); Object o = listOperations.leftPop(); if (o instanceof DelayJob) { log.info("执行队列取出任务:{}", JSON.toJSONString((DelayJob) o)); return (DelayJob) o; } return null; }
轮询处理
@Component public class DelayTimer implements ApplicationListener <ContextRefreshedEvent> { @Autowired private DelayBucket delayBucket; @Autowired private JobPool jobPool; @Autowired private ReadyQueue readyQueue; @Value("${thread.size}") private int length; @Override public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) { ExecutorService executorService = new ThreadPoolExecutor( length, length, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>()); for (int i = 0; i < length; i++) { executorService.execute( new DelayJobHandler( delayBucket, jobPool, readyQueue, i)); } } }
测试请求
/** * 测试用请求 * @author daify * @date 2019-07-29 10:26 **/ @RestController @RequestMapping("delay") public class DelayController { @Autowired private JobService jobService; /** * 添加 * @param request * @return */ @RequestMapping(value = "add",method = RequestMethod.POST) public String addDefJob(Job request) { DelayJob delayJob = jobService.addDefJob(request); return JSON.toJSONString(delayJob); } /** * 获取 * @return */ @RequestMapping(value = "pop",method = RequestMethod.GET) public String getProcessJob(String topic) { Job process = jobService.getProcessJob(topic); return JSON.toJSONString(process); } /** * 完成一个执行的任务 * @param jobId * @return */ @RequestMapping(value = "finish",method = RequestMethod.DELETE) public String finishJob(Long jobId) { jobService.finishJob(jobId); return "success"; } @RequestMapping(value = "delete",method = RequestMethod.DELETE) public String deleteJob(Long jobId) { jobService.deleteJob(jobId); return "success"; } }
测试
2019-08-12 21:21:36.589 INFO 21444 --- [nio-8000-exec-6] d.samples.redis.delay.container.JobPool : 任务池添加任务:{"delayTime":10000,"id":3,"message":"tag:testid:3","retryCount":0,"status":"DELAY","topic":"test","ttrTime":10000} 2019-08-12 21:21:36.609 INFO 21444 --- [nio-8000-exec-6] d.s.redis.delay.container.DelayBucket : 添加延迟任务:{"delayDate":1565616106609,"jodId":3,"topic":"test"}
2019-08-12 21:21:46.744 INFO 21444 --- [pool-1-thread-4] d.s.redis.delay.container.ReadyQueue : 执行队列添加任务:DelayJob(jodId=3, delayDate=1565616106609, topic=test)
2019-08-09 19:36:02.342 INFO 58456 --- [nio-8000-exec-3] d.s.redis.delay.container.ReadyQueue : 执行队列取出任务:{"delayDate":1565321728704,"jodId":1,"topic":"测试"} 2019-08-09 19:36:02.364 INFO 58456 --- [nio-8000-exec-3] d.samples.redis.delay.container.JobPool : 任务池添加任务:{"delayTime":10000,"id":1,"message":"延迟10秒,超时30秒","retryCount":0,"status":"RESERVED","topic":"测试","ttrTime":30000} 2019-08-09 19:36:02.384 INFO 58456 --- [nio-8000-exec-3] d.s.redis.delay.container.DelayBucket : 添加延迟任务:{"delayDate":1565321792364,"jodId":1,"topic":"测试"}
2019-08-12 21:21:48.239 INFO 21444 --- [nio-8000-exec-7] d.s.redis.delay.container.ReadyQueue : 执行队列取出任务:{"delayDate":1565616106609,"jodId":3,"topic":"test"} 2019-08-12 21:21:48.261 INFO 21444 --- [nio-8000-exec-7] d.samples.redis.delay.container.JobPool : 任务池添加任务:{"delayTime":10000,"id":3,"message":"tag:testid:3","retryCount":0,"status":"RESERVED","topic":"test","ttrTime":10000}
2019-08-12 21:21:54.880 INFO 21444 --- [nio-8000-exec-8] d.samples.redis.delay.container.JobPool : 任务池移除任务:3 2019-08-12 21:21:59.104 INFO 21444 --- [pool-1-thread-5] d.s.redis.delay.handler.DelayJobHandler : 移除不存在任务:{"delayDate":1565616118261,"jodId":3,"topic":"test"}