合并OnlineUser逻辑

This commit is contained in:
easonzhu 2025-02-28 17:12:33 +08:00
parent 90ef2ed34f
commit ff52ad5806
21 changed files with 225 additions and 220 deletions

View File

@ -42,7 +42,6 @@ import com.syzb.rbac.entity.UsersRoles;
import com.syzb.rbac.mapper.UsersRolesMapper;
import com.syzb.rbac.service.DeptService;
import com.syzb.rbac.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

View File

@ -12,11 +12,9 @@ import com.syzb.common.util.logger.LoggerUtil;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashMap;

View File

@ -10,9 +10,7 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PreDestroy;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.*;
@Configuration
public class HazelcastConfiguration {
@ -23,6 +21,8 @@ public class HazelcastConfiguration {
@Value("${hazelcast.serverPort}")
private Integer serverPort;
private Set<String> cacheNameSet = new HashSet<>();
@Bean
public HazelcastInstance hazelcastInstance() {
List<String> memberList = Arrays.asList(members.split(","));
@ -39,8 +39,10 @@ public class HazelcastConfiguration {
config.setInstanceName(instanceName);
config.setClusterName(clusterName);
for (Map.Entry<String, CacheConfig.LocalMapConfig> entry : CacheConfig.getConfigMap().entrySet()) {
String cacheName = entry.getKey();
cacheNameSet.add(cacheName);
config.addMapConfig(new MapConfig()
.setName(entry.getKey())
.setName(cacheName)
.setEvictionConfig(new EvictionConfig()
.setEvictionPolicy(EvictionPolicy.LRU)
.setSize(entry.getValue().maxSize)
@ -79,6 +81,12 @@ public class HazelcastConfiguration {
return instance;
}
public void cleanCache() {
for (String cacheName : cacheNameSet) {
hazelcastInstance().getMap(cacheName).clear();
}
}
private static void createMapping(HazelcastInstance instance) {
SqlService sqlService = instance.getSql();
sqlService.execute("CREATE OR REPLACE MAPPING " + CacheKey.VIDEO_ONLINE_USER + " TYPE IMap OPTIONS (" +

View File

@ -0,0 +1,30 @@
package com.syzb.common.controller;
import com.syzb.common.config.cache.HazelcastConfiguration;
import com.syzb.common.result.CommonResult;
import com.syzb.common.result.ResponseStatus;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import javax.validation.constraints.NotBlank;
@RestController
@Validated
public class CacheController {
@Resource
private HazelcastConfiguration hazelcastConfiguration;
@GetMapping("/admin/cache/clean")
public CommonResult<Void> clean(@Validated @RequestParam("code") @NotBlank String code) {
if (!code.equals("hello_syzb")) {
return CommonResult.error(ResponseStatus.PERMISSION_ERROR);
}
hazelcastConfiguration.cleanCache();
return CommonResult.success();
}
}

View File

@ -4,11 +4,10 @@ import com.hazelcast.map.IMap;
import com.syzb.common.constant.IsOrNot;
import com.syzb.common.constant.ProductType;
import com.syzb.common.entity.OnlineUser;
import com.syzb.common.service.OnlineUserService;
import com.syzb.common.vo.FrontUserVO;
import com.syzb.group.service.common.GroupCacheService;
import com.syzb.group.service.common.GroupMessageService;
import com.syzb.video.helper.VideoHelper;
import com.syzb.video.service.common.VideoCacheService;
import com.syzb.video.service.common.VideoMessageService;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.stereotype.Component;
@ -20,17 +19,14 @@ import java.util.Map;
@Component
public class WebSocketSessionHandler {
@Resource
private VideoCacheService videoCacheService;
@Resource
private VideoMessageService videoMessageService;
@Resource
private GroupCacheService groupCacheService;
private GroupMessageService groupMessageService;
@Resource
private GroupMessageService groupMessageService;
private OnlineUserService onlineUserService;
public void handleConnect(StompHeaderAccessor header) {
Map<String, Object> attributes = header.getSessionAttributes();
@ -88,7 +84,7 @@ public class WebSocketSessionHandler {
}
private void updateOnlineStatus(Integer productType, Integer productId, String sessionKey, OnlineUser onlineUser) {
IMap<String, OnlineUser> totalOnlineMap = getTotalOnlineMap(productType, productId);
IMap<String, OnlineUser> totalOnlineMap = onlineUserService.getTotalOnlineMap(productType);
String cacheKey = VideoHelper.buildOnlineUserCacheKey(productId, sessionKey);
if (totalOnlineMap != null) {
totalOnlineMap.put(cacheKey, onlineUser);
@ -104,7 +100,7 @@ public class WebSocketSessionHandler {
Integer productType = sessionInfo.getProductType();
Integer productId = sessionInfo.getProductId();
String sessionKey = sessionInfo.getSessionKey();
IMap<String, OnlineUser> totalOnlineMap = getTotalOnlineMap(productType, productId);
IMap<String, OnlineUser> totalOnlineMap = onlineUserService.getTotalOnlineMap(productType);
String cacheKey = VideoHelper.buildOnlineUserCacheKey(productId, sessionKey);
OnlineUser onlineUser = totalOnlineMap.get(cacheKey);
if (onlineUser != null) {
@ -120,15 +116,6 @@ public class WebSocketSessionHandler {
onlineUser.setExitTime(LocalDateTime.now());
}
private IMap<String, OnlineUser> getTotalOnlineMap(Integer productType, Integer productId) {
if (ProductType.VIDEO_SINGLE.value.equals(productType)) {
return videoCacheService.getTotalOnlineMap();
} else if (ProductType.GROUP.value.equals(productType)) {
return groupCacheService.getTotalOnlineMap();
}
return null;
}
private void memberNotify(Integer productType, Integer productId, OnlineUser onlineUser) {
if (ProductType.VIDEO_SINGLE.value.equals(productType)) {
videoMessageService.memberNotify(productId, onlineUser);

View File

@ -0,0 +1,134 @@
package com.syzb.common.service;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.map.IMap;
import com.hazelcast.sql.SqlResult;
import com.hazelcast.sql.SqlRow;
import com.syzb.common.config.cache.CacheKey;
import com.syzb.common.constant.ProductType;
import com.syzb.common.entity.OnlineUser;
import com.syzb.common.handler.BizException;
import com.syzb.common.result.ResponseStatus;
import com.syzb.common.util.logger.LoggerUtil;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.TimeUnit;
import static com.syzb.common.config.cache.CacheKey.*;
@Service
public class OnlineUserService {
@Resource
private HazelcastInstance hazelcastInstance;
/**
* 获取总在线用户映射
*
* @return 在线用户映射
*/
public IMap<String, OnlineUser> getTotalOnlineMap(Integer productType) {
String cacheName = getOnlineCacheName(productType);
return hazelcastInstance.getMap(cacheName);
}
public List<OnlineUser> getTotalOnlineList(Integer productType, Integer productId) {
String cacheName = getOnlineCacheName(productType);
String sql = "SELECT * FROM " + cacheName + " WHERE productId = " + productId;
SqlResult result = hazelcastInstance.getSql().execute(sql);
List<OnlineUser> list = new ArrayList<>();
for (SqlRow row : result) {
OnlineUser user = new OnlineUser();
user.setProductType(row.getObject("productType"));
user.setProductId(row.getObject("productId"));
user.setUserId(row.getObject("userId"));
user.setUserName(row.getObject("userName"));
user.setImg(row.getObject("img"));
user.setSessionId(row.getObject("sessionId"));
user.setIsOnline(row.getObject("isOnline"));
user.setIsPlay(row.getObject("isPlay"));
user.setCreateTime(row.getObject("createTime"));
user.setExitTime(row.getObject("exitTime"));
list.add(user);
}
return list;
}
public Set<String> getOnlineUserIds(Integer productType, Integer productId) {
String cacheName = getOnlineCacheName(productType);
String sql = "SELECT distinct userId FROM " + cacheName + " WHERE productId = " + productId;
SqlResult result = hazelcastInstance.getSql().execute(sql);
Set<String> set = new HashSet<>();
for (SqlRow row : result) {
set.add(row.getObject("userId"));
}
return set;
}
/**
* 获取在线人数
*
* @return 在线人数
*/
public int getOnlineCount(Integer productType, Integer productId) {
String mainCacheName = getMainCacheName(productType);
IMap<String, Object> map = hazelcastInstance.getMap(mainCacheName);
String cacheKey = getOnlineCountCacheKey(productType, productId);
Integer onlineCount = (Integer) map.get(cacheKey);
if (onlineCount != null) {
return onlineCount;
}
synchronized (this) {
onlineCount = (Integer) map.get(cacheKey);
if (onlineCount != null) {
return onlineCount;
}
long startTime = System.currentTimeMillis();
onlineCount = getOnlineCountWithoutCache(productType, productId);
LoggerUtil.websocket.info("getOnlineCount-" + productType + "-" + productId + ":" + (System.currentTimeMillis() - startTime) + "ms");
map.put(cacheKey, onlineCount, 2, TimeUnit.SECONDS);
}
return onlineCount;
}
public int getOnlineCountWithoutCache(Integer productType, Integer productId) {
String cacheName = getOnlineCacheName(productType);
String sql = "SELECT COUNT(*) FROM " + cacheName + " WHERE productId = " + productId + " and isOnline = 1";
SqlResult result = hazelcastInstance.getSql().execute(sql);
Iterator<SqlRow> iter = result.iterator();
if (iter.hasNext()) {
return ((Long)iter.next().getObject(0)).intValue();
}
return 0;
}
private String getMainCacheName(Integer productType) {
if (ProductType.VIDEO_SINGLE.value.equals(productType)) {
return VIDEO_LIVE;
} else if (ProductType.GROUP.value.equals(productType)) {
return GROUP;
}
throw new BizException(ResponseStatus.SYS_BUSY, "productType错误");
}
private String getOnlineCacheName(Integer productType) {
if (ProductType.VIDEO_SINGLE.value.equals(productType)) {
return VIDEO_ONLINE_USER;
} else if (ProductType.GROUP.value.equals(productType)) {
return GROUP_ONLINE_USER;
}
throw new BizException(ResponseStatus.SYS_BUSY, "productType错误");
}
private String getOnlineCountCacheKey(Integer productType, Integer productId) {
if (ProductType.VIDEO_SINGLE.value.equals(productType)) {
return CacheKey.VideoLiveKey.ONLINE_COUNT + productId;
} else if (ProductType.GROUP.value.equals(productType)) {
return CacheKey.GroupKey.ONLINE_COUNT + productId;
}
throw new BizException(ResponseStatus.SYS_BUSY, "productType错误");
}
}

View File

@ -4,9 +4,6 @@ import com.syzb.video.constant.VideoLiveColumnStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.ArrayList;
import java.util.Arrays;
@Configuration
public class VideoLiveColumnStateMachine {
@Bean

View File

@ -3,7 +3,6 @@ package com.syzb.group.query.info;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.syzb.common.constant.IsDisplay;
import com.syzb.common.constant.IsOrNot;
import com.syzb.common.vo.BackendUserVO;
import com.syzb.group.constant.GroupInfoStatus;
import com.syzb.group.entity.GroupInfo;

View File

@ -13,6 +13,7 @@ import com.syzb.common.constant.ProductType;
import com.syzb.common.query.OnlyIdQuery;
import com.syzb.common.result.Pager;
import com.syzb.common.service.CommentBlackService;
import com.syzb.common.service.OnlineUserService;
import com.syzb.common.vo.BackendUserVO;
import com.syzb.common.vo.IdCountVO;
import com.syzb.group.constant.GroupCustomerStatus;
@ -25,7 +26,6 @@ import com.syzb.group.entity.GroupUserCollect;
import com.syzb.group.mapper.*;
import com.syzb.group.query.ListGroupCustomerQuery;
import com.syzb.group.query.QueryGroupCollectQuery;
import com.syzb.group.service.common.GroupCacheService;
import com.syzb.group.service.common.GroupCommonService;
import com.syzb.group.vo.GroupCollectVO;
import com.syzb.group.vo.GroupCustomerVO;
@ -69,7 +69,7 @@ public class AdminGroupCollectService {
private CommentBlackService commentBlackService;
@Resource
private GroupCacheService groupCacheService;
private OnlineUserService onlineUserService;
@Resource
private GroupUserFlowMapper groupUserFlowMapper;
@ -242,7 +242,7 @@ public class AdminGroupCollectService {
.in(WxUser::getId, userIdSet));
Map<String, WxUser> wxUserMap = wxUserList.stream().collect(Collectors.toMap(WxUser::getId, Function.identity()));
Set<String> blackUserIds = commentBlackService.getBlackUserIds(groupId, ProductType.GROUP.value);
Set<String> onlineUserIds = groupCacheService.getOnlineUserIds(groupId);
Set<String> onlineUserIds = onlineUserService.getOnlineUserIds(ProductType.GROUP.value, groupId);
Map<String, LocalDateTime> lastVisitTimeMap = queryLastVisitTime(groupId);
Map<String, LocalDateTime> lastChatTimeMap = queryLastChatTime(groupId);
Stream<ModuleUser> stream = moduleUserList.stream();

View File

@ -21,7 +21,6 @@ import com.syzb.common.vo.AuthResultVO;
import com.syzb.common.vo.BackendUserVO;
import com.syzb.common.vo.FrontUserVO;
import com.syzb.common.vo.InsertIdVO;
import com.syzb.course.entity.ShortVideo;
import com.syzb.course.query.SetMainPageQuery;
import com.syzb.course.service.PageService;
import com.syzb.group.constant.GroupInfoStatus;

View File

@ -19,7 +19,6 @@ import com.syzb.group.mapper.GroupInfoMapper;
import com.syzb.group.query.info.ListGroupAppQuery;
import com.syzb.group.service.admin.AdminGroupInfoService;
import com.syzb.group.vo.GroupVO;
import com.syzb.rbac.service.AuthService;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;

View File

@ -4,14 +4,10 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.map.IMap;
import com.hazelcast.sql.SqlResult;
import com.hazelcast.sql.SqlRow;
import com.syzb.advisor.vo.AdvisorBasicVO;
import com.syzb.common.config.cache.CacheKey;
import com.syzb.common.constant.IsOrNot;
import com.syzb.common.entity.OnlineUser;
import com.syzb.common.service.CacheService;
import com.syzb.common.util.logger.LoggerUtil;
import com.syzb.common.vo.DateIdVO;
import com.syzb.group.constant.GroupInteractiveType;
import com.syzb.group.constant.GroupMessageUserType;
@ -27,10 +23,6 @@ import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.time.LocalDate;
import java.util.*;
import java.util.concurrent.TimeUnit;
import static com.syzb.common.config.cache.CacheKey.GROUP;
import static com.syzb.common.config.cache.CacheKey.GROUP_ONLINE_USER;
@Service
public class GroupCacheService {
@ -159,82 +151,6 @@ public class GroupCacheService {
return vo;
}
/**
* 获取总在线用户映射
*
* @return 在线用户映射
*/
public IMap<String, OnlineUser> getTotalOnlineMap() {
return hazelcastInstance.getMap(GROUP_ONLINE_USER);
}
public List<OnlineUser> getTotalOnlineList(Integer groupId) {
String sql = "SELECT * FROM " + GROUP_ONLINE_USER + " WHERE productId = " + groupId;
SqlResult result = hazelcastInstance.getSql().execute(sql);
List<OnlineUser> list = new ArrayList<>();
for (SqlRow row : result) {
OnlineUser user = new OnlineUser();
user.setProductType(row.getObject("productType"));
user.setProductId(row.getObject("productId"));
user.setUserId(row.getObject("userId"));
user.setUserName(row.getObject("userName"));
user.setImg(row.getObject("img"));
user.setSessionId(row.getObject("sessionId"));
user.setIsOnline(row.getObject("isOnline"));
user.setIsPlay(row.getObject("isPlay"));
user.setCreateTime(row.getObject("createTime"));
user.setExitTime(row.getObject("exitTime"));
list.add(user);
}
return list;
}
public Set<String> getOnlineUserIds(Integer groupId) {
String sql = "SELECT distinct userId FROM " + GROUP_ONLINE_USER + " WHERE productId = " + groupId;
SqlResult result = hazelcastInstance.getSql().execute(sql);
Set<String> set = new HashSet<>();
for (SqlRow row : result) {
set.add(row.getObject("userId"));
}
return set;
}
/**
* 获取在线人数
*
* @param groupId 视频ID
* @return 在线人数
*/
public int getOnlineCount(Integer groupId) {
IMap<String, Object> map = hazelcastInstance.getMap(GROUP);
String cacheKey = CacheKey.GroupKey.ONLINE_COUNT + groupId;
Integer onlineCount = (Integer) map.get(cacheKey);
if (onlineCount != null) {
return onlineCount;
}
synchronized (this) {
onlineCount = (Integer) map.get(cacheKey);
if (onlineCount != null) {
return onlineCount;
}
long startTime = System.currentTimeMillis();
onlineCount = getOnlineCountWithoutCache(groupId);
LoggerUtil.websocket.info("getOnlineCount-" + groupId + ":" + (System.currentTimeMillis() - startTime) + "ms");
map.put(cacheKey, onlineCount, 2, TimeUnit.SECONDS);
}
return onlineCount;
}
public int getOnlineCountWithoutCache(Integer groupId) {
String sql = "SELECT COUNT(*) FROM " + GROUP_ONLINE_USER + " WHERE productId = " + groupId + " and isOnline = 1";
SqlResult result = hazelcastInstance.getSql().execute(sql);
Iterator<SqlRow> iter = result.iterator();
if (iter.hasNext()) {
return ((Long)iter.next().getObject(0)).intValue();
}
return 0;
}
private static String buildMessageIdSetKey(String prefix, Integer groupId, String userId, QueryGroupMessageType type) {
String cacheKey = prefix + groupId + "|" + type.value;
if (QueryGroupMessageType.PRIVATE.equals(type)) {

View File

@ -9,7 +9,9 @@ import com.hazelcast.map.IMap;
import com.syzb.business.entity.ModuleUser;
import com.syzb.business.mapper.ModuleUserMapper;
import com.syzb.common.constant.IsOrNot;
import com.syzb.common.constant.ProductType;
import com.syzb.common.entity.OnlineUser;
import com.syzb.common.service.OnlineUserService;
import com.syzb.common.util.logger.LoggerUtil;
import com.syzb.group.entity.GroupInfo;
import com.syzb.group.entity.GroupUserFlow;
@ -28,7 +30,7 @@ import java.util.stream.Collectors;
public class GroupCommonService {
@Resource
private GroupCacheService groupCacheService;
private OnlineUserService onlineUserService;
@Resource
private GroupInfoMapper groupInfoMapper;
@ -44,7 +46,7 @@ public class GroupCommonService {
*/
@Transactional(rollbackFor = Exception.class)
public void saveGroupUser() {
IMap<String, OnlineUser> onlineMap = groupCacheService.getTotalOnlineMap();
IMap<String, OnlineUser> onlineMap = onlineUserService.getTotalOnlineMap(ProductType.GROUP.value);
Collection<OnlineUser> onlineUsers = onlineMap.values();
if (CollUtil.isEmpty(onlineUsers)) {
return;

View File

@ -14,7 +14,6 @@ import com.syzb.common.constant.UserStatus;
import com.syzb.common.constant.UserType;
import com.syzb.common.handler.BizException;
import com.syzb.common.result.ResponseStatus;
import com.syzb.common.state.StateMachine;
import com.syzb.common.util.CodecUtil;
import com.syzb.common.util.JwtUtil;
import com.syzb.common.vo.AuthVO;

View File

@ -14,14 +14,15 @@ import com.syzb.advisor.entity.AdvisorFollow;
import com.syzb.advisor.mapper.AdvisorFollowMapper;
import com.syzb.common.constant.IsFollow;
import com.syzb.common.constant.IsOrNot;
import com.syzb.common.constant.ProductType;
import com.syzb.common.entity.OnlineUser;
import com.syzb.common.service.OnlineUserService;
import com.syzb.video.constant.VideoLiveStatus;
import com.syzb.video.constant.VideoMessageContentType;
import com.syzb.video.constant.VideoMessageStatus;
import com.syzb.video.constant.VideoUserRecordType;
import com.syzb.video.entity.*;
import com.syzb.video.mapper.*;
import com.syzb.video.service.common.VideoCacheService;
import com.syzb.video.vo.statistic.VideoStatisticStaffDetailVO;
import com.syzb.video.vo.statistic.VideoStatisticUserDetailVO;
import org.springframework.beans.factory.annotation.Value;
@ -73,7 +74,7 @@ public class AdminVideoInteractionService {
private VideoLiveMapper videoLiveMapper;
@Resource
private VideoCacheService videoCacheService;
private OnlineUserService onlineUserService;
@Value("${video.finishReadRatio}")
private Double finishReadRatio;
@ -563,7 +564,7 @@ public class AdminVideoInteractionService {
*/
@Transactional(rollbackFor = Exception.class)
public void saveVideoUserDataToDB() {
IMap<String, OnlineUser> onlineMap = videoCacheService.getTotalOnlineMap();
IMap<String, OnlineUser> onlineMap = onlineUserService.getTotalOnlineMap(ProductType.VIDEO_SINGLE.value);
Collection<OnlineUser> onlineUsers = onlineMap.values();
if (CollUtil.isEmpty(onlineUsers)) {
return;

View File

@ -19,6 +19,7 @@ import com.syzb.common.handler.BizException;
import com.syzb.common.result.Pager;
import com.syzb.common.result.ResponseStatus;
import com.syzb.common.service.CacheService;
import com.syzb.common.service.OnlineUserService;
import com.syzb.common.vo.BackendUserVO;
import com.syzb.rbac.entity.Dept;
import com.syzb.rbac.entity.UserDept;
@ -93,10 +94,12 @@ public class AdminVideoStatisticService {
@Resource
private VideoCloudService videoCloudService;
@Resource
private OnlineUserService onlineUserService;
@Value("${video.finishReadRatio}")
private Double finishReadRatio;
/**
* 单场分析
*
@ -637,7 +640,7 @@ public class AdminVideoStatisticService {
}
Page<VideoStatisticUserDetailVO> page = adminVideoInteractionService.selectVideoUserDetail(query.toPage(), wrapper);
List<VideoStatisticUserDetailVO> records = page.getRecords();
Set<String> onlineUserIdSet = videoCacheService.getOnlineUserIds(videoId);
Set<String> onlineUserIdSet = onlineUserService.getOnlineUserIds(ProductType.VIDEO_SINGLE.value, videoId);
Map<Integer, String> userNameMap = userService.getUserMap().values().stream().collect(Collectors.toMap(UserDept::getUserId, UserDept::getName));
Map<String, Integer> readMinuteMap = adminVideoInteractionService.calcuVideoReadMap(videoId, null, null, null);
Map<String, Integer> messageCountMap = adminVideoInteractionService.calUserMessageCount(videoId, null, VideoMessageContentType.TEXT);
@ -857,7 +860,7 @@ public class AdminVideoStatisticService {
// 填充实时在线人数
// 直播在线人数峰值
vo.setOnlineMostNum(adminVideoInteractionService.getOnlineMax(videoId));
vo.setOnlineNum(!VideoLiveStatus.HAS_ENDED.value.equals(liveStatus) ? videoCacheService.getOnlineCount(videoId) : 0);
vo.setOnlineNum(!VideoLiveStatus.HAS_ENDED.value.equals(liveStatus) ? onlineUserService.getOnlineCount(ProductType.VIDEO_SINGLE.value, videoId) : 0);
// 直播观看
if (realStartTime == null && Objects.equals(video.getPlayType(), VideoPlayType.LIVE.value)) {
vo.setReadNum(0);
@ -1047,7 +1050,7 @@ public class AdminVideoStatisticService {
*/
public UserOnlineVO queryUserOnline(UserOnlineQuery query) {
Integer videoId = query.getVideoId();
Set<String> onlineUserIds = videoCacheService.getOnlineUserIds(videoId);
Set<String> onlineUserIds = onlineUserService.getOnlineUserIds(ProductType.VIDEO_SINGLE.value, videoId);
String userId = query.getUserId();
Integer isOnline = onlineUserIds.contains(userId) ? IsOrNot.IS.value : IsOrNot.NOT.value;
return new UserOnlineVO(userId, videoId, isOnline);

View File

@ -13,6 +13,7 @@ import com.syzb.common.constant.ProductType;
import com.syzb.common.handler.BizException;
import com.syzb.common.result.AppPager;
import com.syzb.common.result.ResponseStatus;
import com.syzb.common.service.OnlineUserService;
import com.syzb.common.service.RecommendService;
import com.syzb.common.vo.FrontUserVO;
import com.syzb.rbac.service.UserService;
@ -53,6 +54,9 @@ public class AppVideoColumnService {
@Resource
private UserService userService;
@Resource
private OnlineUserService onlineUserService;
/**
* 获取主栏目列表
*
@ -66,7 +70,7 @@ public class AppVideoColumnService {
ColumnRecommendAppVO columnRecommendAppVO = new ColumnRecommendAppVO(columnId, columnName, null, null);
Integer videoId = videoCacheService.getVideoColumnLivingVideo(columnId);
columnRecommendAppVO.setStatus(videoId == null ? null : VideoLiveStatus.LIVING.value);
columnRecommendAppVO.setOnline(videoId == null ? null : videoCacheService.getOnlineCount(videoId));
columnRecommendAppVO.setOnline(videoId == null ? null : onlineUserService.getOnlineCount(ProductType.VIDEO_SINGLE.value, videoId));
return columnRecommendAppVO;
}).collect(Collectors.toList());
}
@ -138,7 +142,7 @@ public class AppVideoColumnService {
}
VideoColumnAppVO vo = new VideoColumnAppVO(column);
Integer lastVideoId = videoCacheService.getVideoColumnLivingVideo(id);
vo.setLastVideo(lastVideoId != null ? new VideoBasicInfoVO(lastVideoId, videoCacheService.getOnlineCount(lastVideoId)) : null);
vo.setLastVideo(lastVideoId != null ? new VideoBasicInfoVO(lastVideoId, onlineUserService.getOnlineCount(ProductType.VIDEO_SINGLE.value, lastVideoId)) : null);
vo.setReadCount(videoCacheService.getVideoColumnReadCount(vo.getId()));
if (frontUserVO != null) {
vo.setIsSub(videoCacheService.checkFollowColumn(frontUserVO.getUserId(), column.getId()));

View File

@ -98,6 +98,9 @@ public class AppVideoInfoService {
@Resource
private VideoLiveMixMapper videoLiveMixMapper;
@Resource
private OnlineUserService onlineUserService;
@Value("${resizeUrl.urlMain}")
private String urlMain;
@ -379,7 +382,7 @@ public class AppVideoInfoService {
Integer liveStatus = vo.getLiveStatus();
vo.setOnline(0);
if (VideoLiveStatus.LIVING.value.equals(liveStatus)) {
vo.setOnline(videoCacheService.getOnlineCount(vo.getId()));
vo.setOnline(onlineUserService.getOnlineCount(ProductType.VIDEO_SINGLE.value, vo.getId()));
}
voList.add(vo);
if (--size == 0) {
@ -478,7 +481,7 @@ public class AppVideoInfoService {
}
vo.setFavorUserCount(videoCacheService.getVideoFavorUserCount(videoId));
vo.setSubscribeUserCount(videoCacheService.getVideoSubscribeUserCount(videoId));
vo.setOnline(videoCacheService.getOnlineCount(videoId));
vo.setOnline(onlineUserService.getOnlineCount(ProductType.VIDEO_SINGLE.value, videoId));
if (video.getProductId() != null && video.getProductType() != null) {
Table<Integer, Integer, MergeProductInfoVO> table = mergeProductService.queryMergeProductInfo(Collections.singletonList(video));
vo.setInfoVO(table.get(video.getProductType(), video.getProductId()));
@ -608,7 +611,7 @@ public class AppVideoInfoService {
List<VideoFollowAdvisorInfoAppVO> mergeVos = getRecommendedVideos(advisorIds, includeVideoIds);
appVOS = appVOS.stream().peek(a -> {
Integer videoId = a.getVideoId();
Integer online = videoCacheService.getOnlineCount(videoId);
Integer online = onlineUserService.getOnlineCount(ProductType.VIDEO_SINGLE.value, videoId);
a.setOnline(online);
}).collect(Collectors.toList());

View File

@ -8,8 +8,6 @@ import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.hazelcast.collection.ISet;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.map.IMap;
import com.hazelcast.sql.SqlResult;
import com.hazelcast.sql.SqlRow;
import com.syzb.advisor.constant.FollowOption;
import com.syzb.advisor.entity.AdvisorBasic;
import com.syzb.advisor.service.AdvisorInfoService;
@ -17,13 +15,11 @@ import com.syzb.common.config.cache.CacheKey;
import com.syzb.common.constant.IsActive;
import com.syzb.common.constant.IsOrNot;
import com.syzb.common.constant.ProductType;
import com.syzb.common.entity.OnlineUser;
import com.syzb.common.result.AppPager;
import com.syzb.common.service.AdvertService;
import com.syzb.common.service.CacheService;
import com.syzb.common.service.RecommendService;
import com.syzb.common.service.TagService;
import com.syzb.common.util.logger.LoggerUtil;
import com.syzb.common.vo.FrontUserVO;
import com.syzb.common.vo.TagVO;
import com.syzb.course.constant.CourseContentType;
@ -56,7 +52,6 @@ import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -163,82 +158,6 @@ public class VideoCacheService {
return hazelcastInstance.getMap(mapName);
}
/**
* 获取总在线用户映射
*
* @return 在线用户映射
*/
public IMap<String, OnlineUser> getTotalOnlineMap() {
return hazelcastInstance.getMap(VIDEO_ONLINE_USER);
}
public List<OnlineUser> getTotalOnlineList(Integer videoId) {
String sql = "SELECT * FROM " + VIDEO_ONLINE_USER + " WHERE productId = " + videoId;
SqlResult result = hazelcastInstance.getSql().execute(sql);
List<OnlineUser> list = new ArrayList<>();
for (SqlRow row : result) {
OnlineUser user = new OnlineUser();
user.setProductType(row.getObject("productType"));
user.setProductId(row.getObject("productId"));
user.setUserId(row.getObject("userId"));
user.setUserName(row.getObject("userName"));
user.setImg(row.getObject("img"));
user.setSessionId(row.getObject("sessionId"));
user.setIsOnline(row.getObject("isOnline"));
user.setIsPlay(row.getObject("isPlay"));
user.setCreateTime(row.getObject("createTime"));
user.setExitTime(row.getObject("exitTime"));
list.add(user);
}
return list;
}
public Set<String> getOnlineUserIds(Integer videoId) {
String sql = "SELECT distinct userId FROM " + VIDEO_ONLINE_USER + " WHERE productId = " + videoId;
SqlResult result = hazelcastInstance.getSql().execute(sql);
Set<String> set = new HashSet<>();
for (SqlRow row : result) {
set.add(row.getObject("userId"));
}
return set;
}
/**
* 获取在线人数
*
* @param videoId 视频ID
* @return 在线人数
*/
public int getOnlineCount(Integer videoId) {
IMap<String, Object> map = hazelcastInstance.getMap(VIDEO_LIVE);
String cacheKey = VideoLiveKey.ONLINE_COUNT + videoId;
Integer onlineCount = (Integer) map.get(cacheKey);
if (onlineCount != null) {
return onlineCount;
}
synchronized (this) {
onlineCount = (Integer) map.get(cacheKey);
if (onlineCount != null) {
return onlineCount;
}
long startTime = System.currentTimeMillis();
onlineCount = getOnlineCountWithoutCache(videoId);
LoggerUtil.websocket.info("getOnlineCount-" + videoId + ":" + (System.currentTimeMillis() - startTime) + "ms");
map.put(cacheKey, onlineCount, 2, TimeUnit.SECONDS);
}
return onlineCount;
}
public int getOnlineCountWithoutCache(Integer videoId) {
String sql = "SELECT COUNT(*) FROM " + VIDEO_ONLINE_USER + " WHERE productId = " + videoId + " and isOnline = 1";
SqlResult result = hazelcastInstance.getSql().execute(sql);
Iterator<SqlRow> iter = result.iterator();
if (iter.hasNext()) {
return ((Long)iter.next().getObject(0)).intValue();
}
return 0;
}
public Integer getVideoSubscribeUserCount(Integer videoId) {
return getVideoUserSubscribeIds(videoId).size();
}

View File

@ -110,6 +110,9 @@ public class VideoCommonService {
@Resource
private CacheService cacheService;
@Resource
private OnlineUserService onlineUserService;
// 防抖延时(ms)
private final long pushDelay = 10 * 1000;
@ -344,7 +347,7 @@ public class VideoCommonService {
try {
Integer videoId = video.getId();
String videoName = video.getTitle();
Set<String> onlineUserIds = videoCacheService.getOnlineUserIds(videoId);
Set<String> onlineUserIds = onlineUserService.getOnlineUserIds(ProductType.VIDEO_SINGLE.value, videoId);
// 查询预约的用户
LambdaQueryWrapper<VideoLiveUser> subWrapper = Wrappers.<VideoLiveUser>lambdaQuery()
@ -571,7 +574,7 @@ public class VideoCommonService {
//观看
Integer readCount = videoCacheService.getVideoReadCount(videoId);
//在线数
int onlineCount = videoCacheService.getOnlineCount(videoId);
int onlineCount = onlineUserService.getOnlineCount(ProductType.VIDEO_SINGLE.value, videoId);
VideoPcAdvisorMessageVO messageVO = new VideoPcAdvisorMessageVO(favorCount, readCount, onlineCount);
videoMessageService.publishPcAdvisorMessage(videoId, messageVO);
}

View File

@ -10,9 +10,11 @@ import com.hazelcast.map.IMap;
import com.hazelcast.topic.ITopic;
import com.syzb.advisor.service.AdvisorInfoService;
import com.syzb.common.constant.IsOrNot;
import com.syzb.common.constant.ProductType;
import com.syzb.common.entity.OnlineUser;
import com.syzb.common.handler.BizException;
import com.syzb.common.result.ResponseStatus;
import com.syzb.common.service.OnlineUserService;
import com.syzb.common.util.logger.LoggerUtil;
import com.syzb.common.vo.FrontUserVO;
import com.syzb.rbac.service.UserService;
@ -66,6 +68,9 @@ public class VideoMessageService {
@Resource
private UserService userService;
@Resource
private OnlineUserService onlineUserService;
private ITopic<VideoWsMessageVO<?>> videoNotifyTopic = null;
private ITopic<VideoWsMessageVO<?>> videoMessageTopic = null;
@ -287,7 +292,7 @@ public class VideoMessageService {
}
vo.setAdvisorBasic(advisorInfoService.getAdvisorVoMap().get(message.getAdvisorId()));
if (VideoMessageContentType.USER_ENTER.value.equals(vo.getType())) {
vo.setOnline(videoCacheService.getOnlineCount(vo.getVideoId()));
vo.setOnline(onlineUserService.getOnlineCount(ProductType.VIDEO_SINGLE.value,vo.getVideoId()));
}
if (vo.getCreateUserId() != null) {
vo.setCreateUserVO(userService.get(message.getCreateUserId(), false));
@ -327,14 +332,14 @@ public class VideoMessageService {
*/
public void listMember(Integer videoId) {
//获取进入过直播间的人数
List<OnlineUser> totalOnlineList = videoCacheService.getTotalOnlineList(videoId);
List<OnlineUser> totalOnlineList = onlineUserService.getTotalOnlineList(ProductType.VIDEO_SINGLE.value, videoId);
int totalCount = totalOnlineList.size();
LoggerUtil.info("获取直播间在线人列表入参:" + videoId);
LoggerUtil.info("直播间id=" + videoId + "总人数:" + totalCount);
VideoCustomerVO videoCustomerVO = new VideoCustomerVO(null, 0, 0);
if (totalCount != 0) {
//获取在线人数
int onlineCount = videoCacheService.getOnlineCount(videoId);
int onlineCount = onlineUserService.getOnlineCount(ProductType.VIDEO_SINGLE.value, videoId);
LoggerUtil.info("直播间id=" + videoId + "在线人数:" + onlineCount);
videoCustomerVO = new VideoCustomerVO(totalOnlineList, onlineCount, totalCount);
}
@ -464,7 +469,7 @@ public class VideoMessageService {
LoggerUtil.error("reportPlayStatus上报类型错误, reportType:" + reportType);
throw new BizException(ResponseStatus.PARM_ERROR, "上报类型错误");
}
IMap<String, OnlineUser> totalOnlineMap = videoCacheService.getTotalOnlineMap();
IMap<String, OnlineUser> totalOnlineMap = onlineUserService.getTotalOnlineMap(ProductType.VIDEO_SINGLE.value);
String cacheKey = VideoHelper.buildOnlineUserCacheKey(videoId, sessionKey);
OnlineUser onlineUser = totalOnlineMap.get(cacheKey);
if (onlineUser == null) {