Skip to content

Commit 784db24

Browse files
committed
doc:完善README
1 parent e2f4ed5 commit 784db24

File tree

6 files changed

+142
-26
lines changed

6 files changed

+142
-26
lines changed

README.md

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,42 @@
1-
# 基于Redis实现延时队列
1+
# 基于Redis实现延时队列
2+
3+
## 集成方式
4+
5+
### 1、实现`com.joy.queue.AbstractWaitQueueExecutor`
6+
7+
```java
8+
9+
@Setter
10+
@Getter
11+
@Slf4j
12+
@Component
13+
public class DefaultWaitQueueExecutor extends AbstractWaitQueueExecutor {
14+
15+
private BiFunction<String, String, Boolean> consumer;
16+
17+
public DefaultWaitQueueExecutor(ThreadPoolTaskExecutor taskExecutor) {
18+
super(taskExecutor);
19+
}
20+
21+
@Override
22+
public boolean processByQueueName(String queueName, String subject, Object extData) {
23+
log.debug("队列运行程序 queueName:{} subject:{}", queueName, subject);
24+
if (null != getConsumer()) {
25+
return getConsumer().apply(queueName, subject);
26+
}
27+
return false;
28+
}
29+
}
30+
```
31+
32+
### 2.调用`com.joy.queue.WaitQueueProvider#enqueue`入列
33+
34+
```java
35+
WaitQueueInfo waitQueueInfo1 = waitQueueProvider.enqueue(QUEUE_NAME, "test1");
36+
```
37+
38+
### 3.调用任意队列名的`com.joy.queue.WaitQueueProvider#query`即可启动队列
39+
40+
```java
41+
WaitQueueInfo waitQueueInfo = waitQueueProvider.query(QUEUE_NAME, "test1");
42+
```

src/main/java/com/joy/queue/AbstractWaitQueueExecutor.java

Lines changed: 4 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.joy.queue;
22

3+
import lombok.Setter;
34
import lombok.extern.slf4j.Slf4j;
45
import org.springframework.beans.factory.annotation.Qualifier;
56
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@@ -25,8 +26,10 @@ public abstract class AbstractWaitQueueExecutor implements WaitQueueExecutor {
2526

2627
private volatile boolean isRunning = false;
2728

29+
@Setter
2830
private Long waitInterval = 30 * 1000L;
2931

32+
@Setter
3033
private Integer failFitCount = 100;
3134

3235
private Future<?> threadFuture;
@@ -106,7 +109,7 @@ protected void waitSingle() throws InterruptedException {
106109

107110
private void runInternalSingle() throws InterruptedException {
108111
List<String> tmpList = new LinkedList<>(queueSet.keySet());
109-
if (tmpList.size() == 0) {
112+
if (tmpList.isEmpty()) {
110113
this.waitSingle();
111114
return;
112115
}
@@ -159,17 +162,6 @@ protected boolean peekAndProcess(String queueName, CountDownLatch startCount) th
159162
return hasFetch;
160163
}
161164

162-
/**
163-
* 具体业务实现
164-
*
165-
* @param queueName
166-
* @param subject
167-
* @return
168-
*/
169-
public boolean processByQueueName(String queueName, String subject) {
170-
return this.processByQueueName(queueName, subject);
171-
}
172-
173165
public boolean processByQueueName(String queueName, String subject, Object extData) {
174166
return false;
175167
}
@@ -187,27 +179,17 @@ protected void checkFaitCount(String queueName) {
187179
}
188180
}
189181

190-
191182
public Long getWaitInterval() {
192183
if (waitInterval == null || waitInterval <= 0L) {
193184
waitInterval = 5000L;
194185
}
195186
return waitInterval;
196187
}
197188

198-
public void setWaitInterval(Long waitInterval) {
199-
this.waitInterval = waitInterval;
200-
}
201-
202189
public Integer getFailFitCount() {
203190
if (failFitCount == null || failFitCount == 0) {
204191
failFitCount = 10;
205192
}
206193
return failFitCount;
207194
}
208-
209-
public void setFailFitCount(Integer failFitCount) {
210-
this.failFitCount = failFitCount;
211-
}
212-
213195
}

src/main/java/com/joy/queue/DefaultWaitQueueExecutor.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,5 +31,4 @@ public boolean processByQueueName(String queueName, String subject, Object extDa
3131
}
3232
return false;
3333
}
34-
3534
}

src/main/java/com/joy/queue/WaitQueueProvider.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,13 @@
77
*/
88
public interface WaitQueueProvider {
99

10+
/**
11+
* 入列
12+
*
13+
* @param queueName 队列名称
14+
* @param subject 排队数据
15+
* @return 排队情况
16+
*/
1017
default WaitQueueInfo enqueue(String queueName, String subject) {
1118
return enqueue(queueName, subject, null);
1219
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
3+
<configuration debug="false">
4+
5+
<include resource="org/springframework/boot/logging/logback/defaults.xml" />
6+
7+
<!-- 控制台输出 -->
8+
<include resource="org/springframework/boot/logging/logback/console-appender.xml"/>
9+
10+
11+
<springProperty name="APP_NAME" scope="context" source="spring.application.name"/>
12+
<springProperty name="LOG_PATH" scope="context" source="${LOG_PATH:-/var/log/delay-queue}"/>
13+
14+
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
15+
<encoder>
16+
<pattern>%clr(%d{${LOG_DATEFORMAT_PATTERN:-HH:mm:ss.SSS}}){faint}%clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(-){faint} %clr([%X{requestId}]){faint} %clr(%-40.40logger{30}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}</pattern>
17+
</encoder>
18+
</appender>
19+
20+
<!-- 按照每天生成日志文件 -->
21+
<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
22+
<file>${LOG_PATH}/${APP_NAME}.log</file>
23+
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
24+
<!--日志文件输出的文件名 -->
25+
<fileNamePattern>${LOG_PATH}/${APP_NAME}-%d{yyyyMMdd}-%i.log.gz</fileNamePattern>
26+
<!--日志文件保留天数 -->
27+
<maxHistory>30</maxHistory>
28+
<!--日志文件最大的大小 -->
29+
<maxFileSize>30MB</maxFileSize>
30+
</rollingPolicy>
31+
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
32+
<!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符 -->
33+
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} : %msg%n</pattern>
34+
</encoder>
35+
</appender>
36+
37+
<!-- 预发服配置 -->
38+
<springProfile name="pre">
39+
<root level="INFO">
40+
<appender-ref ref="FILE"/>
41+
<appender-ref ref="CONSOLE"/>
42+
</root>
43+
</springProfile>
44+
45+
<!-- 测试服配置 -->
46+
<springProfile name="test">
47+
<root level="INFO">
48+
<appender-ref ref="FILE"/>
49+
<appender-ref ref="CONSOLE"/>
50+
</root>
51+
</springProfile>
52+
53+
<!-- 开发环境配置 -->
54+
<springProfile name="dev">
55+
<root level="INFO">
56+
<appender-ref ref="CONSOLE"/>
57+
</root>
58+
</springProfile>
59+
60+
<!-- 生产环境配置 -->
61+
<springProfile name="prod">
62+
<root level="INFO">
63+
<appender-ref ref="FILE"/>
64+
<appender-ref ref="CONSOLE"/>
65+
</root>
66+
</springProfile>
67+
</configuration>
Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,47 @@
11
package com.joy;
22

33
import com.joy.entity.WaitQueueInfo;
4+
import com.joy.queue.DefaultWaitQueueExecutor;
45
import com.joy.queue.WaitQueueProvider;
56
import jakarta.annotation.Resource;
7+
import org.junit.jupiter.api.Assertions;
8+
import org.junit.jupiter.api.DisplayName;
9+
import org.junit.jupiter.api.Tag;
610
import org.junit.jupiter.api.Test;
711
import org.springframework.boot.test.context.SpringBootTest;
812

913
/**
1014
* @author Joy
1115
*/
1216
@SpringBootTest
17+
@DisplayName("延迟队列测试")
1318
public class DelayQueueApplicationTest {
1419
@Resource
1520
private WaitQueueProvider waitQueueProvider;
21+
@Resource
22+
private DefaultWaitQueueExecutor defaultWaitQueueExecutor;
1623

1724
private static final String QUEUE_NAME = "test_queue";
1825

19-
2026
@Test
21-
public void test() {
27+
@Tag("入列测试")
28+
public void testEnqueue() {
2229
WaitQueueInfo waitQueueInfo1 = waitQueueProvider.enqueue(QUEUE_NAME, "test1");
2330
WaitQueueInfo waitQueueInfo2 = waitQueueProvider.enqueue(QUEUE_NAME, "test2");
2431
WaitQueueInfo waitQueueInfo3 = waitQueueProvider.enqueue(QUEUE_NAME, "test3");
32+
Assertions.assertNotNull(waitQueueInfo1);
33+
Assertions.assertNotNull(waitQueueInfo2);
34+
Assertions.assertNotNull(waitQueueInfo3);
35+
}
2536

37+
@Test
38+
@DisplayName("队列启动测试")
39+
public void testQuery() {
40+
defaultWaitQueueExecutor.setConsumer((queueName, subject) -> {
41+
System.out.println("queueName:" + queueName + " subject:" + subject);
42+
return true;
43+
});
44+
WaitQueueInfo waitQueueInfo = waitQueueProvider.query(QUEUE_NAME, "test1");
45+
Assertions.assertNotNull(waitQueueInfo);
2646
}
2747
}

0 commit comments

Comments
 (0)