实现即时消息的方法有很多种比如websocket,sse; 而sse 又有spring mvc 实现的也有webflux 实现的。mvc实现的网上已经有很多了,而webflux 实现的不是很多,也不是很全,因此本文主要做的是webflux 实现的即时消息,sse 这里不多讲,如果有不理解的可以自行百度,谷歌。
maven 依赖在最下面
下面是最简单的实现也是应用场景最少的实现
@GetMapping(path = "/sse/{userId}",produces = MediaType.TEXT_EVENT_STREAM_VALUE )
public Flux> sse(@PathVariable String userId) {
// 每两秒推送一次
return Flux.interval(Duration.ofSeconds(2)).map(seq->
Tuples.of(seq, LocalDateTime.now())).log()
.map(data-> ServerSentEvent.builder().id("1").data(data.getT2().toString()).build());
}
上面的适合股票之类的,周期性的消息。比如每两秒发送一次消息;这样的场景是合适的,但是如果是非周期性的消息呢?比如我需要再应用里发一个公告,这个公告是突然的,不确定的,那么这个逻辑就不合适了。
下面介绍非周期性消息
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.http.MediaType;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
/**
* @author haoran
*/
@RestController
@RequestMapping("/sse")
public class MessageController implements ApplicationListener {
private final SubscribableChannel subscribableChannel = MessageChannels.publishSubscribe().get();
@GetMapping(value = "/message",produces = MediaType.TEXT_EVENT_STREAM_VALUE )
public Flux getMessage(){
return Flux.create(stringFluxSink -> {
MessageHandler messageHandler = message -> stringFluxSink.next(String.class.cast(message.getPayload()));
// 用户断开的时候取消订阅
stringFluxSink.onCancel(()->subscribableChannel.unsubscribe(messageHandler));
// 订阅消息
subscribableChannel.subscribe(messageHandler);
}, FluxSink.OverflowStrategy.LATEST);
}
@Override
public void onApplicationEvent(ApplicationEvent event) {
subscribableChannel.send(new GenericMessage(event.getSource()));
}
@PostMapping("/publish")
public void publish(@RequestParam String message){
subscribableChannel.send(new GenericMessage(message));
}
}
这里有个局限性 就是单服务的消息,那如果是多服务的集群消息怎么解决呢?
下面代码是使用redis 的发布订阅模式来实现webflux 的sse 集群
import indi.houhaoran.webflux.domian.MessageDTO;
import lombok.RequiredArgsConstructor;
import org.redisson.api.RedissonClient;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
/**
* @author haoran
*/
@RestController
@RequestMapping("/flux")
@RequiredArgsConstructor
public class FluxMessageController {
private final RedissonClient redissonClient;
public static final String USER_TOPIC = "user:";
public static final String BROADCAST_TOPIC = "broadcast_topic";
@GetMapping(path = "/connect/{userId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux getFolderWatch(@PathVariable String userId) {
return Flux.create(sink -> {
// 订阅 广播
redissonClient.getTopic(BROADCAST_TOPIC).addListener(MessageDTO.class, (c, m) -> {
sink.next(m);
});
// 监听 用户主题 单个
redissonClient.getTopic(USER_TOPIC + userId).addListener(MessageDTO.class, (c, m) -> {
sink.next(m);
});
//加入监听如果断开链接就移除redis 的订阅
sink.onCancel(() -> {
// 断开移除
System.out.println("退出 userId:" + userId);
redissonClient.getTopic(USER_TOPIC + userId).removeAllListeners();
redissonClient.getTopic(BROADCAST_TOPIC).removeListener((Integer) redissonClient.getMap(BROADCAST_TOPIC).get(userId));
});
}, FluxSink.OverflowStrategy.LATEST);
}
@PostMapping("/publish")
public void publish(@RequestBody MessageDTO messageDTO) {
redissonClient.getTopic(BROADCAST_TOPIC).publish(messageDTO);
}
}
redisson 配置
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate redisTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate redisTemplate = new RedisTemplate();
redisTemplate.setConnectionFactory(connectionFactory);
redisTemplate.setConnectionFactory(connectionFactory);
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new StringRedisSerializer());
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
// 这个地方不可使用 json 序列化,否则会有问题,会出现一个 java.lang.IllegalArgumentException: Value must not be null! 错误
redisTemplate.setHashValueSerializer(new StringRedisSerializer());
return redisTemplate;
}
}
@Slf4j
@Configuration
public class RedissonConfigure {
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
SingleServerConfig singleServerConfig = config.useSingleServer();
singleServerConfig.setAddress("redis://localhost:6379");
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
objectMapper.registerModule(new JavaTimeModule());
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
config.setCodec(new JsonJacksonCodec(objectMapper));
return Redisson.create(config);
}
}
其他类
import java.io.Serializable;
/**
* @author haoran
*/
@Data
public class MessageDTO implements Serializable {
private String message;
}
调试:
由此可见当我从8080 服务发送消息,8080,8081两个服务都接收到消息了
maven 依赖
webfluxdemo
org.example
1.0-SNAPSHOT
4.0.0
server
org.springframework.boot
spring-boot-starter-webflux
org.projectlombok
lombok
1.18.22
de.ruedigermoeller
fst
2.57
org.apache.commons
commons-pool2
org.redisson
redisson-spring-boot-starter
3.17.0
org.springframework.boot
spring-boot-starter-integration
org.springframework.boot
spring-boot-maven-plugin
父pom
org.example
webfluxdemo
pom
1.0-SNAPSHOT
org.springframework.boot
spring-boot-starter-parent
2.6.4
client
server
RxJava
org.springframework.boot
spring-boot-starter-webflux
org.springframework.boot
spring-boot-maven-plugin
spring-snapshots
https://repo.spring.io/snapshot
spring-milestones
https://repo.spring.io/milestone
参考·1 Reactor 3 参考文档 (htmlpreview.github.io)
参考·2 https://www.lefer.cn/posts/30624/
结语:百度真垃圾,查了半天也没找到,终归要google;本文只是简单的实现了sse 在真实场景下会有很多不足,比如redis 加入订阅的是通过lamda 表达式实现的,这里最好有个实现类来实现订阅发送消息的业务。
题外话:webflux 如何实现响应式报表?