优化在线用户缓存逻辑

This commit is contained in:
easonzhu 2025-02-20 10:34:49 +08:00
parent 9bb59f448f
commit a60a467694
13 changed files with 286 additions and 203 deletions

View File

@ -24,11 +24,6 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.1.1-jre</version>
</dependency>
<dependency>
<groupId>org.hibernate.validator</groupId>
<artifactId>hibernate-validator</artifactId>

View File

@ -0,0 +1,71 @@
package com.upchina.common.config.cache;
import com.hazelcast.config.InMemoryFormat;
import java.util.HashMap;
import java.util.Map;
import static com.upchina.common.config.cache.CacheKey.*;
public class CacheConfig {
public static final String DEFAULT_MAP_NAME = "default";
public static class LocalMapConfig {
public final int maxSize;
public final int liveSeconds;
public final InMemoryFormat inMemoryFormat;
LocalMapConfig(int maxSize, int liveSeconds) {
this.maxSize = maxSize;
this.liveSeconds = liveSeconds;
this.inMemoryFormat = InMemoryFormat.BINARY;
}
LocalMapConfig(int maxSize, int liveSeconds, InMemoryFormat inMemoryFormat) {
this.maxSize = maxSize;
this.liveSeconds = liveSeconds;
this.inMemoryFormat = inMemoryFormat;
}
}
public static Map<String, LocalMapConfig> getConfigMap() {
// 设置近地缓存实时同步,不采用批量提交策略
System.setProperty("hazelcast.map.invalidation.batch.enabled", "false");
// PER_NODEmax-size指定单个集群成员中map条目的最大数量这是max-size的默认策略如果使用这个配置需要注意max-size的值必须大于分区的数量默认为271
// PER_PARTITIONmax-size指定每个分区存储的map条目最大数这个策略建议不要在小规模的集群中使用因为小规模的集群单个节点包含了大量的分区在执行回收策略时会去按照分区的划分组个检查回收条件导致效率低下
// USED_HEAP_SIZE指在每个Hazelcast实例中max-size指定map所占用的内存堆的以megabytes计算兆字节最大值需要注意这个策略不能工作在in-memory-format=OBJECT因为当数据被设置为OBJECT时无法确定所占用的内存大小
// USED_HEAP_PERCENTAGE每个Hazelcast实例中max-size指定map占用内存堆的百分比例如JVM被设置有1000MB而这个值设置为max-size=10当map条目数占用的堆数据超过100MB时Hazelcast开始执行数据释放工作需要注意的是当使用这个策略时不能将in-memory-format设置为OBJECT理由同上
// FREE_HEAP_SIZEmax-size指定了单个JVM的堆最小空闲空间单位为megabytes
// FREE_HEAP_PERCENTAGEmax-size指定单个JVM的最小空闲空间的百分比例如JVM分配了1000MB的空间这个值设置为10当空闲堆只有100MB时会引发map的数据清除放行为
// 当map条数超过10000会引发map的数据清除行为
Map<String, LocalMapConfig> configMap = new HashMap<>();
configMap.put(DISTRIBUTED_LOCK, new LocalMapConfig(1000, 0));
configMap.put(DEPT, new LocalMapConfig(10000, 3600));
configMap.put(TAG, new LocalMapConfig(10000, 3600));
configMap.put(ADVISOR_INFO, new LocalMapConfig(10000, 300));
configMap.put(RECOMMEND, new LocalMapConfig(10000, 300));
configMap.put(USER, new LocalMapConfig(10000, 3600));
configMap.put(CAPTCHA, new LocalMapConfig(10000, 300));
configMap.put(RBAC, new LocalMapConfig(10000, 300));
configMap.put(URL_MAP, new LocalMapConfig(10000, 300, InMemoryFormat.OBJECT));
configMap.put(SCREEN, new LocalMapConfig(1000, 10, InMemoryFormat.OBJECT));
configMap.put(VIDEO_TX_ONLINE, new LocalMapConfig(1000, 20, InMemoryFormat.OBJECT));
configMap.put(VIDEO_LIVE_MESSAGE, new LocalMapConfig(10000, 86400));
configMap.put(VIDEO_LIVE, new LocalMapConfig(10000, 300));
configMap.put(VIDEO_LIVE_DELAY, new LocalMapConfig(10000, 10));
configMap.put(VIDEO_LIVE_COLUMN, new LocalMapConfig(10000, 300));
configMap.put(VIDEO_ACTIVITY, new LocalMapConfig(1000, 300));
configMap.put(VIDEO_LIVE_LIBRARY, new LocalMapConfig(1000, 300));
configMap.put(VIDEO_LIVE_USER_MAP, new LocalMapConfig(10000, 300));
configMap.put(CUSTOMER_MAP, new LocalMapConfig(10000, 3600));
configMap.put(QUESTION, new LocalMapConfig(1000, 30));
configMap.put(COURSE, new LocalMapConfig(10000, 300));
configMap.put(GROUP, new LocalMapConfig(10000, 300));
return configMap;
}
}

View File

@ -358,4 +358,8 @@ public class CacheKey {
public static final String GROUP_MESSAGE_DATE_ID_MAP = "group_message_date_id_map|";
}
public static final String GROUP_ONLINE_USER = "group_online_user";
public static final String VIDEO_ONLINE_USER = "video_online_user";
}

View File

@ -8,12 +8,9 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static com.upchina.common.config.cache.CacheKey.*;
@Configuration
public class HazelcastConfiguration {
@ -23,67 +20,6 @@ public class HazelcastConfiguration {
@Value("${hazelcast.serverPort}")
private Integer serverPort;
private static final String DEFAULT_MAP_NAME = "default";
private static final Map<String, LocalMapConfig> configMap = new HashMap<>();
public static class LocalMapConfig {
private final int maxSize;
private final int liveSeconds;
private final InMemoryFormat inMemoryFormat;
LocalMapConfig(int maxSize, int liveSeconds) {
this.maxSize = maxSize;
this.liveSeconds = liveSeconds;
this.inMemoryFormat = InMemoryFormat.BINARY;
}
LocalMapConfig(int maxSize, int liveSeconds, InMemoryFormat inMemoryFormat) {
this.maxSize = maxSize;
this.liveSeconds = liveSeconds;
this.inMemoryFormat = inMemoryFormat;
}
}
static {
// 设置近地缓存实时同步,不采用批量提交策略
System.setProperty("hazelcast.map.invalidation.batch.enabled", "false");
// PER_NODEmax-size指定单个集群成员中map条目的最大数量这是max-size的默认策略如果使用这个配置需要注意max-size的值必须大于分区的数量默认为271
// PER_PARTITIONmax-size指定每个分区存储的map条目最大数这个策略建议不要在小规模的集群中使用因为小规模的集群单个节点包含了大量的分区在执行回收策略时会去按照分区的划分组个检查回收条件导致效率低下
// USED_HEAP_SIZE指在每个Hazelcast实例中max-size指定map所占用的内存堆的以megabytes计算兆字节最大值需要注意这个策略不能工作在in-memory-format=OBJECT因为当数据被设置为OBJECT时无法确定所占用的内存大小
// USED_HEAP_PERCENTAGE每个Hazelcast实例中max-size指定map占用内存堆的百分比例如JVM被设置有1000MB而这个值设置为max-size=10当map条目数占用的堆数据超过100MB时Hazelcast开始执行数据释放工作需要注意的是当使用这个策略时不能将in-memory-format设置为OBJECT理由同上
// FREE_HEAP_SIZEmax-size指定了单个JVM的堆最小空闲空间单位为megabytes
// FREE_HEAP_PERCENTAGEmax-size指定单个JVM的最小空闲空间的百分比例如JVM分配了1000MB的空间这个值设置为10当空闲堆只有100MB时会引发map的数据清除放行为
// 当map条数超过10000会引发map的数据清除行为
configMap.put(DISTRIBUTED_LOCK, new LocalMapConfig(1000, 0));
configMap.put(DEPT, new LocalMapConfig(10000, 3600));
configMap.put(TAG, new LocalMapConfig(10000, 3600));
configMap.put(ADVISOR_INFO, new LocalMapConfig(10000, 300));
configMap.put(RECOMMEND, new LocalMapConfig(10000, 300));
configMap.put(USER, new LocalMapConfig(10000, 3600));
configMap.put(CAPTCHA, new LocalMapConfig(10000, 300));
configMap.put(RBAC, new LocalMapConfig(10000, 300));
configMap.put(URL_MAP, new LocalMapConfig(10000, 300, InMemoryFormat.OBJECT));
configMap.put(SCREEN, new LocalMapConfig(1000, 10, InMemoryFormat.OBJECT));
configMap.put(VIDEO_TX_ONLINE, new LocalMapConfig(1000, 20, InMemoryFormat.OBJECT));
configMap.put(VIDEO_LIVE_MESSAGE, new LocalMapConfig(10000, 86400));
configMap.put(VIDEO_LIVE, new LocalMapConfig(10000, 300));
configMap.put(VIDEO_LIVE_DELAY, new LocalMapConfig(10000, 10));
configMap.put(VIDEO_LIVE_COLUMN, new LocalMapConfig(10000, 300));
configMap.put(VIDEO_ACTIVITY, new LocalMapConfig(1000, 300));
configMap.put(VIDEO_LIVE_LIBRARY, new LocalMapConfig(1000, 300));
configMap.put(VIDEO_LIVE_USER_MAP, new LocalMapConfig(10000, 300));
configMap.put(CUSTOMER_MAP, new LocalMapConfig(10000, 3600));
configMap.put(QUESTION, new LocalMapConfig(1000, 30));
configMap.put(COURSE, new LocalMapConfig(10000, 300));
configMap.put(GROUP, new LocalMapConfig(10000, 300));
}
@Bean
public Config hazelCastConfig() {
List<String> memberList = Arrays.asList(members.split(","));
@ -99,7 +35,7 @@ public class HazelcastConfiguration {
String instanceName = clusterName + "." + "localIP";
config.setInstanceName(instanceName);
config.setClusterName(clusterName);
for (Map.Entry<String, LocalMapConfig> entry : configMap.entrySet()) {
for (Map.Entry<String, CacheConfig.LocalMapConfig> entry : CacheConfig.getConfigMap().entrySet()) {
config.addMapConfig(new MapConfig()
.setName(entry.getKey())
.setEvictionConfig(new EvictionConfig()
@ -124,7 +60,7 @@ public class HazelcastConfiguration {
// 默认map配置主要用于USER_VIDEO_TOTAL_ONLINE + videoId
config.addMapConfig(new MapConfig()
.setName(DEFAULT_MAP_NAME)
.setName(CacheConfig.DEFAULT_MAP_NAME)
.setNearCacheConfig(new NearCacheConfig()
.setInMemoryFormat(InMemoryFormat.OBJECT)
.setCacheLocalEntries(true)

View File

@ -7,6 +7,7 @@ import com.upchina.common.entity.OnlineUser;
import com.upchina.common.vo.FrontUserVO;
import com.upchina.group.service.common.GroupCacheService;
import com.upchina.group.service.common.GroupMessageService;
import com.upchina.video.helper.VideoHelper;
import com.upchina.video.service.common.VideoCacheService;
import com.upchina.video.service.common.VideoMessageService;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
@ -88,8 +89,9 @@ public class WebSocketSessionHandler {
private void updateOnlineStatus(Integer productType, Integer productId, String sessionKey, OnlineUser onlineUser) {
IMap<String, OnlineUser> totalOnlineMap = getTotalOnlineMap(productType, productId);
String cacheKey = VideoHelper.buildOnlineUserCacheKey(productId, sessionKey);
if (totalOnlineMap != null) {
totalOnlineMap.put(sessionKey, onlineUser);
totalOnlineMap.put(cacheKey, onlineUser);
}
}
@ -103,10 +105,11 @@ public class WebSocketSessionHandler {
Integer productId = sessionInfo.getProductId();
String sessionKey = sessionInfo.getSessionKey();
IMap<String, OnlineUser> totalOnlineMap = getTotalOnlineMap(productType, productId);
OnlineUser onlineUser = totalOnlineMap.get(sessionKey);
String cacheKey = VideoHelper.buildOnlineUserCacheKey(productId, sessionKey);
OnlineUser onlineUser = totalOnlineMap.get(cacheKey);
if (onlineUser != null) {
updateOfflineStatus(onlineUser);
totalOnlineMap.put(sessionKey, onlineUser);
totalOnlineMap.put(cacheKey, onlineUser);
memberNotify(productType, productId, onlineUser);
}
}
@ -119,9 +122,9 @@ public class WebSocketSessionHandler {
private IMap<String, OnlineUser> getTotalOnlineMap(Integer productType, Integer productId) {
if (ProductType.VIDEO_SINGLE.value.equals(productType)) {
return videoCacheService.getTotalOnlineMap(productId);
return videoCacheService.getTotalOnlineMap();
} else if (ProductType.GROUP.value.equals(productType)) {
return groupCacheService.getTotalOnlineMap(productId);
return groupCacheService.getTotalOnlineMap();
}
return null;
}

View File

@ -4,18 +4,20 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.map.IMap;
import com.hazelcast.sql.SqlResult;
import com.hazelcast.sql.SqlRow;
import com.upchina.advisor.vo.AdvisorBasicVO;
import com.upchina.common.config.cache.CacheKey;
import com.upchina.common.constant.IsOrNot;
import com.upchina.common.entity.OnlineUser;
import com.upchina.common.service.CacheService;
import com.upchina.common.util.logger.LoggerUtil;
import com.upchina.common.vo.DateIdVO;
import com.upchina.group.constant.GroupInteractiveType;
import com.upchina.group.constant.GroupMessageUserType;
import com.upchina.group.constant.QueryGroupMessageType;
import com.upchina.group.entity.GroupMessage;
import com.upchina.group.entity.GroupMessageRead;
import com.upchina.group.entity.GroupUserFlow;
import com.upchina.group.mapper.GroupMessageMapper;
import com.upchina.group.mapper.GroupMessageReadMapper;
import com.upchina.group.mapper.GroupUserFlowMapper;
@ -26,8 +28,10 @@ import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.time.LocalDate;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.concurrent.TimeUnit;
import static com.upchina.common.config.cache.CacheKey.GROUP_ONLINE_USER;
import static com.upchina.common.config.cache.CacheKey.VIDEO_LIVE;
@Service
public class GroupCacheService {
@ -159,16 +163,80 @@ public class GroupCacheService {
return vo;
}
public IMap<String, OnlineUser> getTotalOnlineMap(Integer groupId) {
return cacheService.getMap(CacheKey.GroupKey.USER_TOTAL_ONLINE + groupId, () -> {
synchronized (GroupCacheService.class) {
List<OnlineUser> hisList = groupUserFlowMapper.loadHis(Wrappers.<GroupUserFlow>lambdaQuery()
.eq(GroupUserFlow::getGroupId, groupId)
.groupBy(GroupUserFlow::getUserId, GroupUserFlow::getSessionId));
return hisList.stream()
.collect(Collectors.toMap(h -> h.getUserId() + "-" + h.getSessionId(), Function.identity()));
/**
* 获取总在线用户映射
*
* @return 在线用户映射
*/
public IMap<String, OnlineUser> getTotalOnlineMap() {
return hazelcastInstance.getMap(GROUP_ONLINE_USER);
}
public List<OnlineUser> getTotalOnlineList(Integer videoId) {
String sql = "SELECT * FROM " + GROUP_ONLINE_USER + " WHERE productId = " + videoId;
SqlResult result = hazelcastInstance.getSql().execute(sql);
List<OnlineUser> list = new ArrayList<>();
for (SqlRow row : result) {
OnlineUser user = new OnlineUser();
user.setProductType(row.getObject("productType"));
user.setProductId(row.getObject("productId"));
user.setUserId(row.getObject("userId"));
user.setUserName(row.getObject("userName"));
user.setImg(row.getObject("img"));
user.setSessionId(row.getObject("sessionId"));
user.setIsOnline(row.getObject("isOnline"));
user.setIsPlay(row.getObject("isPlay"));
user.setCreateTime(row.getObject("createTime"));
user.setExitTime(row.getObject("exitTime"));
list.add(user);
}
return list;
}
public Set<String> getOnlineUserIds(Integer videoId) {
String sql = "SELECT distinct userId FROM " + GROUP_ONLINE_USER + " WHERE videoId = " + videoId;
SqlResult result = hazelcastInstance.getSql().execute(sql);
Set<String> set = new HashSet<>();
for (SqlRow row : result) {
set.add(row.getObject("userId"));
}
return set;
}
/**
* 获取在线人数
*
* @param videoId 视频ID
* @return 在线人数
*/
public int getOnlineCount(Integer videoId) {
IMap<String, Object> map = hazelcastInstance.getMap(VIDEO_LIVE);
String cacheKey = CacheKey.VideoLiveKey.ONLINE_COUNT + videoId;
Integer onlineCount = (Integer) map.get(cacheKey);
if (onlineCount != null) {
return onlineCount;
}
synchronized (this) {
onlineCount = (Integer) map.get(cacheKey);
if (onlineCount != null) {
return onlineCount;
}
});
long startTime = System.currentTimeMillis();
onlineCount = getOnlineCountWithoutCache(videoId);
LoggerUtil.websocket.info("getOnlineCount-" + videoId + ":" + (System.currentTimeMillis() - startTime) + "ms");
map.put(cacheKey, onlineCount, 2, TimeUnit.SECONDS);
}
return onlineCount;
}
public int getOnlineCountWithoutCache(Integer videoId) {
String sql = "SELECT COUNT(*) FROM " + GROUP_ONLINE_USER + " WHERE videoId = " + videoId + " and isOnline = 1";
SqlResult result = hazelcastInstance.getSql().execute(sql);
Iterator<SqlRow> iter = result.iterator();
if (iter.hasNext()) {
return ((Long)iter.next().getObject(0)).intValue();
}
return 0;
}
private static String buildMessageIdSetKey(String prefix, String userId, QueryGroupMessageType type) {

View File

@ -105,40 +105,32 @@ public class GroupCommonService {
*/
@Transactional(rollbackFor = Exception.class)
public void saveGroupUser() {
LocalDateTime now = LocalDateTime.now();
LambdaQueryWrapper<GroupInfo> wrapper = Wrappers.<GroupInfo>lambdaQuery()
.select(GroupInfo::getId, GroupInfo::getStatus);
List<GroupInfo> groups = groupInfoMapper.selectList(wrapper);
if (CollUtil.isEmpty(groups)) {
IMap<String, OnlineUser> onlineMap = groupCacheService.getTotalOnlineMap();
Collection<OnlineUser> onlineUsers = onlineMap.values();
if (CollUtil.isEmpty(onlineUsers)) {
return;
}
LocalDateTime now = LocalDateTime.now();
LocalDateTime time = now.withSecond(0).withNano(0);
LocalDateTime startTime = now.plusMinutes(-1);
groups.forEach(g -> {
Integer id = g.getId();
IMap<String, OnlineUser> onlineMap = groupCacheService.getTotalOnlineMap(id);
if (onlineMap == null || onlineMap.isEmpty()) {
return;
List<GroupUserFlow> onLineList = onlineUsers.stream().map(user -> {
Integer groupId = user.getProductId();
if (IsOrNot.IS.value.equals(user.getIsOnline()) || (user.getExitTime() != null && user.getExitTime().isAfter(startTime))) {
GroupUserFlow groupUserFlow = new GroupUserFlow();
groupUserFlow.setGroupId(groupId);
groupUserFlow.setUserId(user.getUserId());
groupUserFlow.setSessionId(user.getSessionId());
groupUserFlow.setTime(time);
groupUserFlow.setEnterTime(user.getCreateTime() != null ? user.getCreateTime().withSecond(0).withNano(0) : null);
groupUserFlow.setExitTime(user.getExitTime() != null ? user.getExitTime().withSecond(0).withNano(0) : null);
return groupUserFlow;
}
// 将实时在线人落库并添加是否正在观看
List<GroupUserFlow> onLineList = onlineMap.values().stream()
.filter(u -> IsOrNot.IS.value.equals(u.getIsOnline()) || (u.getExitTime() != null && u.getExitTime().isAfter(startTime)))
.map(o -> {
GroupUserFlow groupUserFlow = new GroupUserFlow();
groupUserFlow.setGroupId(id);
groupUserFlow.setUserId(o.getUserId());
groupUserFlow.setSessionId(o.getSessionId());
groupUserFlow.setTime(time);
groupUserFlow.setEnterTime(o.getCreateTime() != null ? o.getCreateTime().withSecond(0).withNano(0) : null);
groupUserFlow.setExitTime(o.getExitTime() != null ? o.getExitTime().withSecond(0).withNano(0) : null);
return groupUserFlow;
})
.collect(Collectors.toList());
if (CollUtil.isNotEmpty(onLineList)) {
ListUtil.split(onLineList, 1000).stream().filter(CollUtil::isNotEmpty).forEach(groupUserFlowMapper::insertBatchSomeColumn);
}
});
return null;
}).filter(Objects::nonNull).collect(Collectors.toList());
// 将实时在线人落库并添加是否正在观看
if (CollUtil.isNotEmpty(onLineList)) {
ListUtil.split(onLineList, 1000).forEach(groupUserFlowMapper::insertBatchSomeColumn);
}
}
public void collectGroupData() {

View File

@ -145,4 +145,8 @@ public class VideoHelper {
return liveHours;
}
public static String buildOnlineUserCacheKey(Integer productId, String sessionKey) {
return productId + "-" + sessionKey;
}
}

View File

@ -563,46 +563,43 @@ public class AdminVideoInteractionService {
*/
@Transactional(rollbackFor = Exception.class)
public void saveVideoUserDataToDB() {
LocalDateTime now = LocalDateTime.now();
LambdaQueryWrapper<VideoLive> wrapper = Wrappers.<VideoLive>lambdaQuery()
.select(VideoLive::getId, VideoLive::getLiveStatus)
.ge(VideoLive::getStartTime, now.plusDays(-7));
List<VideoLive> videoLives = videoLiveMapper.selectList(wrapper);
if (CollUtil.isEmpty(videoLives)) {
IMap<String, OnlineUser> onlineMap = videoCacheService.getTotalOnlineMap();
Collection<OnlineUser> onlineUsers = onlineMap.values();
if (CollUtil.isEmpty(onlineUsers)) {
return;
}
Set<Integer> videoIds = onlineMap.values().stream().map(OnlineUser::getProductId).collect(Collectors.toSet());
LocalDateTime now = LocalDateTime.now();
List<VideoLive> videoLives = videoLiveMapper.selectList(Wrappers.<VideoLive>lambdaQuery().in(VideoLive::getId, videoIds));
Map<Integer, Integer> videoStatusMap = videoLives.stream().collect(Collectors.toMap(VideoLive::getId, VideoLive::getLiveStatus));
LocalDateTime time = now.withSecond(0).withNano(0);
LocalDateTime startTime = now.plusMinutes(-1);
videoLives.forEach(v -> {
Integer videoId = v.getId();
Integer liveStatus = v.getLiveStatus();
boolean calIsPlay = Objects.equals(VideoLiveStatus.LIVING.value, liveStatus) || Objects.equals(VideoLiveStatus.HAS_ENDED.value, liveStatus);
IMap<String, OnlineUser> onlineMap = videoCacheService.getTotalOnlineMap(videoId);
if (onlineMap == null || onlineMap.isEmpty()) {
return;
List<VideoUserFlow> onLineList = onlineUsers.stream().map(user -> {
Integer videoId = user.getProductId();
Integer liveStatus = videoStatusMap.get(videoId);
if (liveStatus == null) {
return null;
}
// 将实时在线人落库并添加是否正在观看
List<VideoUserFlow> onLineList = onlineMap.values().stream()
.filter(u -> IsOrNot.IS.value.equals(u.getIsOnline()) || (u.getExitTime() != null && u.getExitTime().isAfter(startTime)))
.map(o -> {
VideoUserFlow videoUserFlow = new VideoUserFlow();
videoUserFlow.setVideoId(videoId);
videoUserFlow.setUserId(o.getUserId());
videoUserFlow.setSessionId(o.getSessionId());
videoUserFlow.setTime(time);
videoUserFlow.setEnterTime(o.getCreateTime() != null ? o.getCreateTime().withSecond(0).withNano(0) : null);
videoUserFlow.setExitTime(o.getExitTime() != null ? o.getExitTime().withSecond(0).withNano(0) : null);
// 当前用户退出的时候直播处于未开播和暂停状态则不计入观看时长
// 直播中和已结束才计算观看时长
videoUserFlow.setIsPlay(o.getExitTime() != null && calIsPlay ? IsOrNot.IS.value : o.getIsPlay());
return videoUserFlow;
})
.collect(Collectors.toList());
if (CollUtil.isNotEmpty(onLineList)) {
ListUtil.split(onLineList, 1000).forEach(videoUserFlowMapper::insertBatchSomeColumn);
if (IsOrNot.IS.value.equals(user.getIsOnline()) || (user.getExitTime() != null && user.getExitTime().isAfter(startTime))) {
boolean calIsPlay = Objects.equals(VideoLiveStatus.LIVING.value, liveStatus) || Objects.equals(VideoLiveStatus.HAS_ENDED.value, liveStatus);
VideoUserFlow videoUserFlow = new VideoUserFlow();
videoUserFlow.setVideoId(videoId);
videoUserFlow.setUserId(user.getUserId());
videoUserFlow.setSessionId(user.getSessionId());
videoUserFlow.setTime(time);
videoUserFlow.setEnterTime(user.getCreateTime() != null ? user.getCreateTime().withSecond(0).withNano(0) : null);
videoUserFlow.setExitTime(user.getExitTime() != null ? user.getExitTime().withSecond(0).withNano(0) : null);
// 当前用户退出的时候直播处于未开播和暂停状态则不计入观看时长
// 直播中和已结束才计算观看时长
videoUserFlow.setIsPlay(user.getExitTime() != null && calIsPlay ? IsOrNot.IS.value : user.getIsPlay());
return videoUserFlow;
}
});
return null;
}).filter(Objects::nonNull).collect(Collectors.toList());
// 将实时在线人落库并添加是否正在观看
if (CollUtil.isNotEmpty(onLineList)) {
ListUtil.split(onLineList, 1000).forEach(videoUserFlowMapper::insertBatchSomeColumn);
}
}
public static class WatchResultVO {

View File

@ -15,7 +15,6 @@ import com.upchina.common.config.cache.CacheKey;
import com.upchina.common.constant.IsOrNot;
import com.upchina.common.constant.ProductType;
import com.upchina.common.constant.UserType;
import com.upchina.common.entity.OnlineUser;
import com.upchina.common.handler.BizException;
import com.upchina.common.result.Pager;
import com.upchina.common.result.ResponseStatus;
@ -643,8 +642,7 @@ public class AdminVideoStatisticService {
}
Page<VideoStatisticUserDetailVO> page = adminVideoInteractionService.selectVideoUserDetail(query.toPage(), wrapper);
List<VideoStatisticUserDetailVO> records = page.getRecords();
List<OnlineUser> onlineList = videoCacheService.getOnlineList(videoId);
Set<String> onlineUserIdSet = onlineList.stream().map(OnlineUser::getUserId).collect(Collectors.toSet());
Set<String> onlineUserIdSet = videoCacheService.getOnlineUserIds(videoId);
Map<Integer, String> userNameMap = userService.getUserMap().values().stream().collect(Collectors.toMap(UserDept::getUserId, UserDept::getName));
Map<String, Integer> readMinuteMap = adminVideoInteractionService.calcuVideoReadMap(videoId, null, null, null);
Map<String, Integer> messageCountMap = adminVideoInteractionService.calUserMessageCount(videoId, null, VideoMessageContentType.TEXT);
@ -1054,15 +1052,9 @@ public class AdminVideoStatisticService {
*/
public UserOnlineVO queryUserOnline(UserOnlineQuery query) {
Integer videoId = query.getVideoId();
List<OnlineUser> onlineList = videoCacheService.getOnlineList(videoId);
Set<String> onlineUserIds = videoCacheService.getOnlineUserIds(videoId);
String userId = query.getUserId();
Integer isOnline;
if (onlineList != null && !onlineList.isEmpty()) {
OnlineUser entry = onlineList.stream().filter(o -> Objects.equals(o.getUserId(), userId)).findAny().orElse(null);
isOnline = entry != null && entry.getIsOnline() != null ? entry.getIsOnline() : IsOrNot.NOT.value;
} else {
isOnline = IsOrNot.NOT.value;
}
Integer isOnline = onlineUserIds.contains(userId) ? IsOrNot.IS.value : IsOrNot.NOT.value;
return new UserOnlineVO(userId, videoId, isOnline);
}

View File

@ -8,6 +8,8 @@ import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.hazelcast.collection.ISet;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.map.IMap;
import com.hazelcast.sql.SqlResult;
import com.hazelcast.sql.SqlRow;
import com.upchina.advisor.constant.FollowOption;
import com.upchina.advisor.entity.AdvisorBasic;
import com.upchina.advisor.service.AdvisorInfoService;
@ -55,7 +57,6 @@ import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -165,32 +166,41 @@ public class VideoCacheService {
/**
* 获取总在线用户映射
*
* @param videoId 视频ID
* @return 在线用户映射
*/
public IMap<String, OnlineUser> getTotalOnlineMap(Integer videoId) {
return cacheService.getMap(OnlineLineKey.USER_VIDEO_TOTAL_ONLINE + videoId, () -> {
synchronized (VideoCacheService.class) {
List<OnlineUser> hisList = videoUserFlowMapper.loadHis(Wrappers.<VideoUserFlow>query()
.eq("h.video_id", videoId)
.groupBy("h.user_id", "h.session_id"));
return hisList.stream()
.collect(Collectors.toMap(h -> h.getUserId() + "-" + h.getSessionId(), Function.identity()));
}
});
public IMap<String, OnlineUser> getTotalOnlineMap() {
return hazelcastInstance.getMap(VIDEO_ONLINE_USER);
}
/**
* 获取在线用户列表
*
* @param videoId 视频ID
* @return 在线用户列表
*/
public List<OnlineUser> getOnlineList(Integer videoId) {
IMap<String, OnlineUser> map = getTotalOnlineMap(videoId);
return map.values().stream()
.filter(o -> IsOrNot.IS.value.equals(o.getIsOnline()))
.collect(Collectors.toList());
public List<OnlineUser> getTotalOnlineList(Integer videoId) {
String sql = "SELECT * FROM " + VIDEO_ONLINE_USER + " WHERE productId = " + videoId;
SqlResult result = hazelcastInstance.getSql().execute(sql);
List<OnlineUser> list = new ArrayList<>();
for (SqlRow row : result) {
OnlineUser user = new OnlineUser();
user.setProductType(row.getObject("productType"));
user.setProductId(row.getObject("productId"));
user.setUserId(row.getObject("userId"));
user.setUserName(row.getObject("userName"));
user.setImg(row.getObject("img"));
user.setSessionId(row.getObject("sessionId"));
user.setIsOnline(row.getObject("isOnline"));
user.setIsPlay(row.getObject("isPlay"));
user.setCreateTime(row.getObject("createTime"));
user.setExitTime(row.getObject("exitTime"));
list.add(user);
}
return list;
}
public Set<String> getOnlineUserIds(Integer videoId) {
String sql = "SELECT distinct userId FROM " + VIDEO_ONLINE_USER + " WHERE videoId = " + videoId;
SqlResult result = hazelcastInstance.getSql().execute(sql);
Set<String> set = new HashSet<>();
for (SqlRow row : result) {
set.add(row.getObject("userId"));
}
return set;
}
/**
@ -212,13 +222,23 @@ public class VideoCacheService {
return onlineCount;
}
long startTime = System.currentTimeMillis();
onlineCount = getOnlineList(videoId).size();
onlineCount = getOnlineCountWithoutCache(videoId);
LoggerUtil.websocket.info("getOnlineCount-" + videoId + ":" + (System.currentTimeMillis() - startTime) + "ms");
map.put(cacheKey, onlineCount, 10, TimeUnit.SECONDS);
map.put(cacheKey, onlineCount, 2, TimeUnit.SECONDS);
}
return onlineCount;
}
public int getOnlineCountWithoutCache(Integer videoId) {
String sql = "SELECT COUNT(*) FROM " + VIDEO_ONLINE_USER + " WHERE videoId = " + videoId + " and isOnline = 1";
SqlResult result = hazelcastInstance.getSql().execute(sql);
Iterator<SqlRow> iter = result.iterator();
if (iter.hasNext()) {
return ((Long)iter.next().getObject(0)).intValue();
}
return 0;
}
public Integer getVideoSubscribeUserCount(Integer videoId) {
return getVideoUserSubscribeIds(videoId).size();
}

View File

@ -9,7 +9,6 @@ import com.google.common.collect.Table;
import com.upchina.common.config.cache.CacheKey;
import com.upchina.common.constant.IsOrNot;
import com.upchina.common.constant.ProductType;
import com.upchina.common.entity.OnlineUser;
import com.upchina.common.handler.BizException;
import com.upchina.common.result.ResponseStatus;
import com.upchina.common.service.*;
@ -345,8 +344,7 @@ public class VideoCommonService {
try {
Integer videoId = video.getId();
String videoName = video.getTitle();
List<OnlineUser> onlineList = videoCacheService.getOnlineList(videoId);
Set<String> onlineUserIds = onlineList.stream().map(OnlineUser::getUserId).collect(Collectors.toSet());
Set<String> onlineUserIds = videoCacheService.getOnlineUserIds(videoId);
// 查询预约的用户
LambdaQueryWrapper<VideoLiveUser> subWrapper = Wrappers.<VideoLiveUser>lambdaQuery()

View File

@ -1,6 +1,7 @@
package com.upchina.video.service.common;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.exceptions.ExceptionUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import com.google.common.collect.ImmutableSet;
@ -24,7 +25,6 @@ import com.upchina.video.vo.cart.CouponVO;
import com.upchina.video.vo.common.VideoProductInfoVO;
import com.upchina.video.vo.common.VideoWsMessageVO;
import com.upchina.video.vo.message.*;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Service;
@ -34,7 +34,10 @@ import javax.annotation.Resource;
import java.net.URL;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static com.upchina.common.config.cache.CacheKey.MessageTopicKey;
@ -324,8 +327,8 @@ public class VideoMessageService {
*/
public void listMember(Integer videoId) {
//获取进入过直播间的人数
IMap<String, OnlineUser> totalOnlineMap = videoCacheService.getTotalOnlineMap(videoId);
int totalCount = totalOnlineMap.size();
List<OnlineUser> totalOnlineList = videoCacheService.getTotalOnlineList(videoId);
int totalCount = totalOnlineList.size();
LoggerUtil.info("获取直播间在线人列表入参:" + videoId);
LoggerUtil.info("直播间id=" + videoId + "总人数:" + totalCount);
VideoCustomerVO videoCustomerVO = new VideoCustomerVO(null, 0, 0);
@ -333,8 +336,7 @@ public class VideoMessageService {
//获取在线人数
int onlineCount = videoCacheService.getOnlineCount(videoId);
LoggerUtil.info("直播间id=" + videoId + "在线人数:" + onlineCount);
List<OnlineUser> onlineUsers = new ArrayList<>(totalOnlineMap.values());
videoCustomerVO = new VideoCustomerVO(onlineUsers, onlineCount, totalCount);
videoCustomerVO = new VideoCustomerVO(totalOnlineList, onlineCount, totalCount);
}
VideoWsMessageVO<VideoCustomerVO> output = VideoWsMessageVO.success(videoId.toString(), videoCustomerVO);
adminUserTopic.publish(output);
@ -462,28 +464,29 @@ public class VideoMessageService {
LoggerUtil.error("reportPlayStatus上报类型错误, reportType:" + reportType);
throw new BizException(ResponseStatus.PARM_ERROR, "上报类型错误");
}
IMap<String, OnlineUser> totalOnlineMap = videoCacheService.getTotalOnlineMap(videoId);
OnlineUser onlineUser = totalOnlineMap.get(sessionKey);
IMap<String, OnlineUser> totalOnlineMap = videoCacheService.getTotalOnlineMap();
String cacheKey = VideoHelper.buildOnlineUserCacheKey(videoId, sessionKey);
OnlineUser onlineUser = totalOnlineMap.get(cacheKey);
if (onlineUser == null) {
LoggerUtil.error("reportPlayStatus在线用户不存在, sessionKey:" + sessionKey);
LoggerUtil.error("reportPlayStatus在线用户不存在, cacheKey:" + cacheKey);
return;
}
LoggerUtil.websocket.info("reportPlayStatus上报播放状态, userId:" + userId + ", videoId:" + videoId + ", sessionId:" + sessionId + ", sessionKey:" + sessionKey + ", reportType:" + reportType, ", requestId:" + requestId);
if (VideoReportType.BACK.value.equals(reportType)) {
onlineUser.setIsPlay(IsOrNot.NOT.value);
totalOnlineMap.put(sessionKey, onlineUser);
totalOnlineMap.put(cacheKey, onlineUser);
} else if (VideoReportType.FRONT.value.equals(reportType)) {
onlineUser.setIsPlay(IsOrNot.IS.value);
totalOnlineMap.put(sessionKey, onlineUser);
totalOnlineMap.put(cacheKey, onlineUser);
}
publishSessionVideoMessage(videoId, sessionId, requestId);
} catch (BizException e) {
LoggerUtil.error("上报播放状态异常:" + ExceptionUtils.getStackTrace(e));
LoggerUtil.error("上报播放状态异常:" + ExceptionUtil.stacktraceToString(e));
if (StrUtil.isNotEmpty(sessionId)) {
publishSessionVideoMessage(videoId, sessionId, e);
}
} catch (Exception e) {
LoggerUtil.error("上报播放状态异常:" + ExceptionUtils.getStackTrace(e));
LoggerUtil.error("上报播放状态异常:" + ExceptionUtil.stacktraceToString(e));
if (StrUtil.isNotEmpty(sessionId)) {
publishSessionVideoMessage(videoId, sessionId, new BizException(ResponseStatus.SYS_BUSY, "上报播放状态失败"));
}