package com.chinamcloud.spiderMember.member.service.impl;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.chinamcloud.spiderMember.common.enums.KafkaTopicEnum;
import com.chinamcloud.spiderMember.common.interceptor.UserSession;
import com.chinamcloud.spiderMember.common.result.ResultDTO;
import com.chinamcloud.spiderMember.member.entity.MemberMember;
import com.chinamcloud.spiderMember.member.entity.MemberModel;
import com.chinamcloud.spiderMember.member.enums.MemberOperationEnum;
import com.chinamcloud.spiderMember.member.service.MemberUpdateToKafkaService;
import com.chinamcloud.spiderMember.member.util.MemberRedisUtil;
import com.chinamcloud.spiderMember.member.vo.MemberUpdateDto;
import com.chinamcloud.spiderMember.member.vo.MemberUpdateResult;
import com.chinamcloud.spiderMember.util.StringUtil;
import java.util.List;
import java.util.Objects;
import org.apache.shardingsphere.infra.hint.HintManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFutureCallback;

@Service
/* loaded from: input_file:com/chinamcloud/spiderMember/member/service/impl/MemberUpdateToKafkaServiceImpl.class */
public class MemberUpdateToKafkaServiceImpl implements MemberUpdateToKafkaService {
    private static final Logger log = LoggerFactory.getLogger(MemberUpdateToKafkaServiceImpl.class);

    @Autowired(required = false)
    @Qualifier("singlekafkaTemplate")
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    private MemberRedisUtil memberRedisUtil;

    @Override // com.chinamcloud.spiderMember.member.service.MemberUpdateToKafkaService
    @Async("taskExecutor")
    public ResultDTO operationToKafka(MemberUpdateDto memberUpdateDto) {
        if (Objects.isNull(memberUpdateDto)) {
            log.info("用户信息为空");
            return ResultDTO.fail("没有用户信息同步");
        }
        if (StringUtil.isEmpty(memberUpdateDto.getTenantId())) {
            log.info("未选择租户:{}", memberUpdateDto.getTenantId());
            return ResultDTO.fail("租户ID为空");
        }
        if (!MemberOperationEnum.getAllMemberOperation().contains(memberUpdateDto.getOperation())) {
            log.info("用户操作方式为空");
            return ResultDTO.fail("没有明确用户做什么操作");
        }
        MemberUpdateResult memberUpdateResult = new MemberUpdateResult();
        memberUpdateResult.setOperation(memberUpdateDto.getOperation());
        JSONObject jSONObject = new JSONObject(8);
        jSONObject.put("memberId", memberUpdateDto.getMemberId());
        jSONObject.put("userName", memberUpdateDto.getUserName());
        jSONObject.put("nickName", memberUpdateDto.getNickName());
        jSONObject.put("mobile", memberUpdateDto.getMobile());
        jSONObject.put("email", memberUpdateDto.getEmail());
        memberUpdateResult.setData(jSONObject);
        String str = memberUpdateDto.getTenantId() + "_" + KafkaTopicEnum.USER_UPDATE_TOPIC.getTopicName();
        String jSONString = JSON.toJSONString(memberUpdateResult);
        log.info("开始推送用户信息到下游,topic：{}，会员信息：{}", str, jSONString);
        this.kafkaTemplate.send(str, jSONString).addCallback(new ListenableFutureCallback<SendResult<String, String>>() { // from class: com.chinamcloud.spiderMember.member.service.impl.MemberUpdateToKafkaServiceImpl.1
            public void onFailure(Throwable th) {
                MemberUpdateToKafkaServiceImpl.log.error("用户操作发送消息失败", th);
            }

            public void onSuccess(SendResult<String, String> sendResult) {
                MemberUpdateToKafkaServiceImpl.log.info("用户操作发送消息成功：{}", sendResult);
            }
        });
        return ResultDTO.success();
    }

    @Override // com.chinamcloud.spiderMember.member.service.MemberUpdateToKafkaService
    @Async("taskExecutor")
    public ResultDTO operationToKafka(MemberMember memberMember, Integer num, String str) {
        MemberUpdateResult memberUpdateResult = new MemberUpdateResult();
        memberUpdateResult.setOperation(num);
        JSONObject jSONObject = new JSONObject(8);
        jSONObject.put("memberId", memberMember.getId());
        jSONObject.put("userName", memberMember.getUsername());
        jSONObject.put("nickName", memberMember.getNickname());
        jSONObject.put("mobile", memberMember.getMobile());
        jSONObject.put("email", memberMember.getEmail());
        memberUpdateResult.setData(jSONObject);
        String str2 = str + "_" + KafkaTopicEnum.USER_UPDATE_TOPIC.getTopicName();
        String jSONString = JSON.toJSONString(memberUpdateResult);
        log.info("开始推送用户信息到下游,topic：{}，会员信息：{}", str2, jSONString);
        this.kafkaTemplate.send(str2, jSONString).addCallback(new ListenableFutureCallback<SendResult<String, String>>() { // from class: com.chinamcloud.spiderMember.member.service.impl.MemberUpdateToKafkaServiceImpl.2
            public void onFailure(Throwable th) {
                MemberUpdateToKafkaServiceImpl.log.error("用户操作发送消息失败", th);
            }

            public void onSuccess(SendResult<String, String> sendResult) {
                MemberUpdateToKafkaServiceImpl.log.info("用户操作发送消息成功：{}", sendResult);
            }
        });
        return ResultDTO.success();
    }

    @Override // com.chinamcloud.spiderMember.member.service.MemberUpdateToKafkaService
    @Async("taskExecutor")
    public ResultDTO operationToKafka(Long l, Integer num, String str) {
        if (StringUtil.isEmpty(str)) {
            str = UserSession.get().getTenantId();
        }
        HintManager.clear();
        HintManager.getInstance().setDatabaseShardingValue(str);
        MemberModel memberModel = this.memberRedisUtil.getMemberModel(str, l);
        MemberMember memberMember = new MemberMember();
        memberMember.setId(memberModel.getId());
        memberMember.setUsername(memberModel.getUsername());
        memberMember.setNickname(memberModel.getNickname());
        memberMember.setMobile(memberModel.getMobile());
        memberMember.setEmail(memberModel.getEmail());
        return operationToKafka(memberMember, num, str);
    }

    @Override // com.chinamcloud.spiderMember.member.service.MemberUpdateToKafkaService
    @Async("taskExecutor")
    public ResultDTO sendAllMemberInfo(String str) {
        if (StringUtil.isEmpty(str)) {
            str = UserSession.get().getTenantId();
        }
        HintManager.clear();
        HintManager.getInstance().setDatabaseShardingValue(str);
        List<MemberModel> allMemberInfo = this.memberRedisUtil.getAllMemberInfo(str);
        Assert.notNull(allMemberInfo);
        String str2 = str + "_" + KafkaTopicEnum.USER_UPDATE_TOPIC.getTopicName();
        allMemberInfo.forEach(memberModel -> {
            MemberUpdateResult memberUpdateResult = new MemberUpdateResult();
            JSONObject jSONObject = new JSONObject(8);
            jSONObject.put("memberId", memberModel.getId());
            jSONObject.put("userName", memberModel.getUsername());
            jSONObject.put("nickName", memberModel.getNickname());
            jSONObject.put("mobile", memberModel.getMobile());
            jSONObject.put("email", memberModel.getEmail());
            memberUpdateResult.setData(jSONObject);
            memberUpdateResult.setOperation(MemberOperationEnum.MEMBER_ADD.getOperation());
            String jSONString = JSON.toJSONString(memberUpdateResult);
            log.info("开始推送用户信息到下游,topic：{}，会员信息：{}", str2, jSONString);
            this.kafkaTemplate.send(str2, jSONString).addCallback(new ListenableFutureCallback<SendResult<String, String>>() { // from class: com.chinamcloud.spiderMember.member.service.impl.MemberUpdateToKafkaServiceImpl.3
                public void onFailure(Throwable th) {
                    MemberUpdateToKafkaServiceImpl.log.error("发送所有用户信息消息失败", th);
                }

                public void onSuccess(SendResult<String, String> sendResult) {
                    MemberUpdateToKafkaServiceImpl.log.info("发送所有用户信息消息成功：{}", sendResult);
                }
            });
        });
        return ResultDTO.success();
    }
}
