From eb760ab3121cb11331edb808376082f20058ef93 Mon Sep 17 00:00:00 2001
From: YunaiV <>
Date: Thu, 14 Mar 2019 22:17:56 +0800
Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=20RocketMQ=20=E7=BB=84?=
 =?UTF-8?q?=E4=BB=B6=20=E6=94=AF=E4=BB=98=E6=88=90=E5=8A=9F=E5=90=8E?=
 =?UTF-8?q?=EF=BC=8C=E5=9B=9E=E8=B0=83=E9=80=9A=E7=9F=A5=E4=B8=9A=E5=8A=A1?=
 =?UTF-8?q?=E7=BA=BF=E8=AE=A2=E5=8D=95=E6=94=AF=E4=BB=98=E6=88=90=E5=8A=9F?=
 =?UTF-8?q?=E7=9A=84=E9=80=BB=E8=BE=91=EF=BC=8C=E7=AE=80=E5=8D=95=E5=AE=8C?=
 =?UTF-8?q?=E6=88=90=E3=80=82=E5=90=8E=E7=BB=AD=EF=BC=8C=E9=9C=80=E8=A6=81?=
 =?UTF-8?q?=E5=B0=81=E8=A3=85=E4=B8=8B=EF=BC=8C=E5=AF=B9=E4=B8=8D=E5=90=8C?=
 =?UTF-8?q?=E4=B8=9A=E5=8A=A1=E7=BA=BF=E7=9A=84=E5=9B=9E=E8=B0=83=E3=80=82?=
 =?UTF-8?q?=E4=BB=A5=E5=8F=8A=EF=BC=8Chttp=20=E5=9B=9E=E8=B0=83=E7=9A=84?=
 =?UTF-8?q?=E5=AE=9E=E7=8E=B0=E3=80=82?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

---
 common/common-framework/pom.xml               |   4 +
 .../common/framework/util/DateUtil.java       |  29 +++++
 .../common/framework/util/ExceptionUtil.java  |   5 +
 .../controller/users/PayDemoController.java   |   6 +-
 .../users/PayTransactionController.java       |   4 +-
 pay/pay-service-impl/pom.xml                  |   6 +
 .../mall/pay/biz/constant/MQConstant.java     |  13 +++
 .../biz/convert/PayTransactionConvert.java    |   5 +
 .../dao/PayTransactionNotifyLogMapper.java    |  11 ++
 .../dao/PayTransactionNotifyTaskMapper.java   |  13 +++
 .../dataobject/PayTransactionNotifyLogDO.java |  13 +++
 .../PayTransactionNotifyTaskDO.java           |  50 +++++++-
 .../mq/PayTransactionPaySuccessConsumer.java  | 108 ++++++++++++++++++
 .../mq/PayTransactionPaySuccessMessage.java   |  91 +++++++++++++++
 .../iocoder/mall/pay/biz/mq/package-info.java |   1 +
 .../pay/biz/scheduler/PayNotifyAppJob.java    |  25 ----
 .../scheduler/PayTransactionNotifyJob.java    |  51 +++++++++
 .../pay/biz/service/PayDemoServiceImpl.java   |   3 +-
 .../mall/pay/biz/service/PayServiceImpl.java  |  15 ++-
 .../main/resources/config/application.yaml    |   9 +-
 .../resources/mapper/PayTransactionMapper.xml |   3 +
 .../mapper/PayTransactionNotifyLogMapper.xml  |  46 ++++++++
 .../mapper/PayTransactionNotifyTaskMapper.xml |  29 +++--
 23 files changed, 488 insertions(+), 52 deletions(-)
 create mode 100644 pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/constant/MQConstant.java
 create mode 100644 pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/dao/PayTransactionNotifyLogMapper.java
 create mode 100644 pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/mq/PayTransactionPaySuccessConsumer.java
 create mode 100644 pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/mq/PayTransactionPaySuccessMessage.java
 create mode 100644 pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/mq/package-info.java
 delete mode 100644 pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/scheduler/PayNotifyAppJob.java
 create mode 100644 pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/scheduler/PayTransactionNotifyJob.java
 create mode 100644 pay/pay-service-impl/src/main/resources/mapper/PayTransactionNotifyLogMapper.xml

diff --git a/common/common-framework/pom.xml b/common/common-framework/pom.xml
index 52d055f6..e5ba8581 100644
--- a/common/common-framework/pom.xml
+++ b/common/common-framework/pom.xml
@@ -60,6 +60,10 @@
             <artifactId>fastjson</artifactId>
             <version>1.2.56</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>
 
     </dependencies>
 
diff --git a/common/common-framework/src/main/java/cn/iocoder/common/framework/util/DateUtil.java b/common/common-framework/src/main/java/cn/iocoder/common/framework/util/DateUtil.java
index d5736e45..20f270b7 100644
--- a/common/common-framework/src/main/java/cn/iocoder/common/framework/util/DateUtil.java
+++ b/common/common-framework/src/main/java/cn/iocoder/common/framework/util/DateUtil.java
@@ -1,10 +1,39 @@
 package cn.iocoder.common.framework.util;
 
 import java.text.SimpleDateFormat;
+import java.util.Calendar;
 import java.util.Date;
 
 public class DateUtil {
 
+    /**
+     * 计算当期时间相差的日期
+     *
+     * @param field  日历字段.<br/>eg:Calendar.MONTH,Calendar.DAY_OF_MONTH,<br/>Calendar.HOUR_OF_DAY等.
+     * @param amount 相差的数值
+     * @return 计算后的日志
+     */
+    public static Date addDate(int field, int amount) {
+        return addDate(null, field, amount);
+    }
+
+    /**
+     * 计算当期时间相差的日期
+     *
+     * @param date   设置时间
+     * @param field  日历字段.<br/>eg:Calendar.MONTH,Calendar.DAY_OF_MONTH,<br/>Calendar.HOUR_OF_DAY等.
+     * @param amount 相差的数值
+     * @return 计算后的日志
+     */
+    public static Date addDate(Date date, int field, int amount) {
+        Calendar c = Calendar.getInstance();
+        if (date != null) {
+            c.setTime(date);
+        }
+        c.add(field, amount);
+        return c.getTime();
+    }
+
     /**
      * @param date    时间。若为空,则返回空串
      * @param pattern 时间格式化
diff --git a/common/common-framework/src/main/java/cn/iocoder/common/framework/util/ExceptionUtil.java b/common/common-framework/src/main/java/cn/iocoder/common/framework/util/ExceptionUtil.java
index 88bab3d7..35c81c4a 100644
--- a/common/common-framework/src/main/java/cn/iocoder/common/framework/util/ExceptionUtil.java
+++ b/common/common-framework/src/main/java/cn/iocoder/common/framework/util/ExceptionUtil.java
@@ -1,6 +1,7 @@
 package cn.iocoder.common.framework.util;
 
 import cn.iocoder.common.framework.exception.ServiceException;
+import org.apache.commons.lang3.exception.ExceptionUtils;
 
 import javax.validation.ConstraintViolationException;
 import java.lang.reflect.InvocationTargetException;
@@ -44,4 +45,8 @@ public class ExceptionUtil {
         return null;
     }
 
+    public static String getRootCauseMessage(Throwable th) {
+        return ExceptionUtils.getRootCauseMessage(th);
+    }
+
 }
\ No newline at end of file
diff --git a/pay/pay-application/src/main/java/cn/iocoder/mall/pay/application/controller/users/PayDemoController.java b/pay/pay-application/src/main/java/cn/iocoder/mall/pay/application/controller/users/PayDemoController.java
index 71ae866d..176321c7 100644
--- a/pay/pay-application/src/main/java/cn/iocoder/mall/pay/application/controller/users/PayDemoController.java
+++ b/pay/pay-application/src/main/java/cn/iocoder/mall/pay/application/controller/users/PayDemoController.java
@@ -9,6 +9,7 @@ import com.alibaba.dubbo.config.annotation.Reference;
 import org.springframework.util.Assert;
 import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.RestController;
 
 import javax.servlet.http.HttpServletRequest;
@@ -25,7 +26,8 @@ public class PayDemoController {
     private PayTransactionService payTransactionService;
 
     @PostMapping("/create_order")
-    public void createOrder(HttpServletRequest request) {
+    public void createOrder(HttpServletRequest request,
+                            @RequestParam("orderId") String orderId) {
         // 创建业务订单
         // ...
 
@@ -33,7 +35,7 @@ public class PayDemoController {
         PayTransactionCreateDTO payTransactionCreateDTO = new PayTransactionCreateDTO()
                 .setAppId("POd4RC6a")
                 .setCreateIp(HttpUtil.getIp(request))
-                .setOrderId("1")
+                .setOrderId(orderId)
                 .setOrderSubject("商品名" )
                 .setOrderDescription("商品描述")
                 .setOrderMemo("商品备注")
diff --git a/pay/pay-application/src/main/java/cn/iocoder/mall/pay/application/controller/users/PayTransactionController.java b/pay/pay-application/src/main/java/cn/iocoder/mall/pay/application/controller/users/PayTransactionController.java
index bd3b25e4..eeaa9bad 100644
--- a/pay/pay-application/src/main/java/cn/iocoder/mall/pay/application/controller/users/PayTransactionController.java
+++ b/pay/pay-application/src/main/java/cn/iocoder/mall/pay/application/controller/users/PayTransactionController.java
@@ -52,8 +52,8 @@ public class PayTransactionController {
 //        JSONObject bodyObj = JSON.parseObject(sb.toString());
 //        bodyObj.put("webhookId", bodyObj.remove("id"));
 //        String body = bodyObj.toString();
-        payService.updateTransactionPaySuccess(PayChannelEnum.PINGXX.getId(), sb.toString());
-        return "";
+        CommonResult<Boolean> result = payService.updateTransactionPaySuccess(PayChannelEnum.PINGXX.getId(), sb.toString());
+        return result.isSuccess() ? "success" : "failure";
     }
 
 }
\ No newline at end of file
diff --git a/pay/pay-service-impl/pom.xml b/pay/pay-service-impl/pom.xml
index 4b892ecb..012b7ffb 100644
--- a/pay/pay-service-impl/pom.xml
+++ b/pay/pay-service-impl/pom.xml
@@ -73,6 +73,12 @@
             <version>2.0.1</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-spring-boot-starter</artifactId>
+            <version>2.0.1</version>
+        </dependency>
+
     </dependencies>
 
     <build>
diff --git a/pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/constant/MQConstant.java b/pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/constant/MQConstant.java
new file mode 100644
index 00000000..57e1b00a
--- /dev/null
+++ b/pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/constant/MQConstant.java
@@ -0,0 +1,13 @@
+package cn.iocoder.mall.pay.biz.constant;
+
+/**
+ * MQ 枚举类
+ */
+public class MQConstant {
+
+    /**
+     * Topic - 支付交易单支付成功
+     */
+    public static final String TOPIC_PAY_TRANSACTION_PAY_SUCCESS = "PAY_TRANSACTION_PAY_SUCCESS";
+
+}
\ No newline at end of file
diff --git a/pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/convert/PayTransactionConvert.java b/pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/convert/PayTransactionConvert.java
index 10c2a395..e6688312 100644
--- a/pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/convert/PayTransactionConvert.java
+++ b/pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/convert/PayTransactionConvert.java
@@ -5,6 +5,8 @@ import cn.iocoder.mall.pay.api.dto.PayTransactionCreateDTO;
 import cn.iocoder.mall.pay.api.dto.PayTransactionSubmitDTO;
 import cn.iocoder.mall.pay.biz.dataobject.PayTransactionDO;
 import cn.iocoder.mall.pay.biz.dataobject.PayTransactionExtensionDO;
+import cn.iocoder.mall.pay.biz.dataobject.PayTransactionNotifyTaskDO;
+import cn.iocoder.mall.pay.biz.mq.PayTransactionPaySuccessMessage;
 import org.mapstruct.Mapper;
 import org.mapstruct.Mappings;
 import org.mapstruct.factory.Mappers;
@@ -23,4 +25,7 @@ public interface PayTransactionConvert {
     @Mappings({})
     PayTransactionExtensionDO convert(PayTransactionSubmitDTO payTransactionSubmitDTO);
 
+    @Mappings({})
+    PayTransactionPaySuccessMessage convert(PayTransactionNotifyTaskDO payTransactionNotifyTaskDO);
+
 }
\ No newline at end of file
diff --git a/pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/dao/PayTransactionNotifyLogMapper.java b/pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/dao/PayTransactionNotifyLogMapper.java
new file mode 100644
index 00000000..c6dd400d
--- /dev/null
+++ b/pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/dao/PayTransactionNotifyLogMapper.java
@@ -0,0 +1,11 @@
+package cn.iocoder.mall.pay.biz.dao;
+
+import cn.iocoder.mall.pay.biz.dataobject.PayTransactionNotifyLogDO;
+import org.springframework.stereotype.Repository;
+
+@Repository
+public interface PayTransactionNotifyLogMapper {
+
+    void insert(PayTransactionNotifyLogDO entity);
+
+}
\ No newline at end of file
diff --git a/pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/dao/PayTransactionNotifyTaskMapper.java b/pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/dao/PayTransactionNotifyTaskMapper.java
index 13cd1103..3c2462e3 100644
--- a/pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/dao/PayTransactionNotifyTaskMapper.java
+++ b/pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/dao/PayTransactionNotifyTaskMapper.java
@@ -3,6 +3,8 @@ package cn.iocoder.mall.pay.biz.dao;
 import cn.iocoder.mall.pay.biz.dataobject.PayTransactionNotifyTaskDO;
 import org.springframework.stereotype.Repository;
 
+import java.util.List;
+
 @Repository
 public interface PayTransactionNotifyTaskMapper {
 
@@ -10,4 +12,15 @@ public interface PayTransactionNotifyTaskMapper {
 
     int update(PayTransactionNotifyTaskDO entity);
 
+    /**
+     * 获得需要通知的 PayTransactionNotifyTaskDO 记录。需要满足如下条件:
+     *
+     * 1. status 非成功
+     * 2. nextNotifyTime 小于当前时间
+     * 3. lastExecuteTime > nextNotifyTime
+     *
+     * @return PayTransactionNotifyTaskDO 数组
+     */
+    List<PayTransactionNotifyTaskDO> selectByNotify();
+
 }
\ No newline at end of file
diff --git a/pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/dataobject/PayTransactionNotifyLogDO.java b/pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/dataobject/PayTransactionNotifyLogDO.java
index 99646ecd..e348c30c 100644
--- a/pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/dataobject/PayTransactionNotifyLogDO.java
+++ b/pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/dataobject/PayTransactionNotifyLogDO.java
@@ -13,6 +13,10 @@ public class PayTransactionNotifyLogDO extends BaseDO {
      * 日志编号,自增
      */
     private Integer id;
+    /**
+     * 通知编号
+     */
+    private Integer notifyId;
     /**
      * 请求参数
      */
@@ -64,4 +68,13 @@ public class PayTransactionNotifyLogDO extends BaseDO {
         return this;
     }
 
+    public Integer getNotifyId() {
+        return notifyId;
+    }
+
+    public PayTransactionNotifyLogDO setNotifyId(Integer notifyId) {
+        this.notifyId = notifyId;
+        return this;
+    }
+
 }
\ No newline at end of file
diff --git a/pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/dataobject/PayTransactionNotifyTaskDO.java b/pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/dataobject/PayTransactionNotifyTaskDO.java
index f7947523..9aebb9be 100644
--- a/pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/dataobject/PayTransactionNotifyTaskDO.java
+++ b/pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/dataobject/PayTransactionNotifyTaskDO.java
@@ -1,6 +1,7 @@
 package cn.iocoder.mall.pay.biz.dataobject;
 
 import cn.iocoder.common.framework.dataobject.BaseDO;
+import cn.iocoder.mall.pay.biz.service.PayServiceImpl;
 
 import java.util.Date;
 
@@ -9,6 +10,16 @@ import java.util.Date;
  */
 public class PayTransactionNotifyTaskDO extends BaseDO {
 
+    /**
+     * 通知频率,单位为秒。
+     *
+     * 算上首次的通知,实际是一共 1 + 8 = 9 次。
+     */
+    public static final Integer[] NOTIFY_FREQUENCY = new Integer[]{
+            15, 15, 30, 180,
+            1800, 1800, 1800, 3600
+    };
+
     /**
      * 编号,自增
      */
@@ -40,9 +51,26 @@ public class PayTransactionNotifyTaskDO extends BaseDO {
      */
     private Integer status;
     /**
-     * 最后一次通知时间
+     * 下一次通知时间
+     */
+    private Date nextNotifyTime;
+    /**
+     * 最后一次执行时间
+     *
+     * 这个字段,需要结合 {@link #nextNotifyTime} 一起使用。
+     *
+     * 1. 初始时,{@link PayServiceImpl#updateTransactionPaySuccess(Integer, String)}
+     *      nextNotifyTime 为当前时间 + 15 秒
+     *      lastExecuteTime 为空
+     *      并发送给 MQ ,执行执行
+     *
+     * 2. MQ 消费时,更新 lastExecuteTime 为当时时间
+     *
+     * 3. 定时任务,扫描 nextNotifyTime < lastExecuteTime 的任务
+     *      nextNotifyTime 为当前时间 + N 秒。具体的 N ,由第几次通知决定
+     *      lastExecuteTime 为当前时间
      */
-    private Date lastNotifyTime;
+    private Date lastExecuteTime;
     /**
      * 当前通知次数
      */
@@ -92,12 +120,12 @@ public class PayTransactionNotifyTaskDO extends BaseDO {
         return this;
     }
 
-    public Date getLastNotifyTime() {
-        return lastNotifyTime;
+    public Date getNextNotifyTime() {
+        return nextNotifyTime;
     }
 
-    public PayTransactionNotifyTaskDO setLastNotifyTime(Date lastNotifyTime) {
-        this.lastNotifyTime = lastNotifyTime;
+    public PayTransactionNotifyTaskDO setNextNotifyTime(Date nextNotifyTime) {
+        this.nextNotifyTime = nextNotifyTime;
         return this;
     }
 
@@ -145,4 +173,14 @@ public class PayTransactionNotifyTaskDO extends BaseDO {
         this.notifyUrl = notifyUrl;
         return this;
     }
+
+    public Date getLastExecuteTime() {
+        return lastExecuteTime;
+    }
+
+    public PayTransactionNotifyTaskDO setLastExecuteTime(Date lastExecuteTime) {
+        this.lastExecuteTime = lastExecuteTime;
+        return this;
+    }
+
 }
\ No newline at end of file
diff --git a/pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/mq/PayTransactionPaySuccessConsumer.java b/pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/mq/PayTransactionPaySuccessConsumer.java
new file mode 100644
index 00000000..6b572e39
--- /dev/null
+++ b/pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/mq/PayTransactionPaySuccessConsumer.java
@@ -0,0 +1,108 @@
+package cn.iocoder.mall.pay.biz.mq;
+
+import cn.iocoder.common.framework.util.DateUtil;
+import cn.iocoder.common.framework.util.ExceptionUtil;
+import cn.iocoder.mall.pay.api.constant.PayTransactionNotifyStatusEnum;
+import cn.iocoder.mall.pay.biz.constant.MQConstant;
+import cn.iocoder.mall.pay.biz.dao.PayTransactionMapper;
+import cn.iocoder.mall.pay.biz.dao.PayTransactionNotifyLogMapper;
+import cn.iocoder.mall.pay.biz.dao.PayTransactionNotifyTaskMapper;
+import cn.iocoder.mall.pay.biz.dataobject.PayTransactionDO;
+import cn.iocoder.mall.pay.biz.dataobject.PayTransactionNotifyLogDO;
+import cn.iocoder.mall.pay.biz.dataobject.PayTransactionNotifyTaskDO;
+import com.alibaba.dubbo.config.ApplicationConfig;
+import com.alibaba.dubbo.config.ReferenceConfig;
+import com.alibaba.dubbo.config.RegistryConfig;
+import com.alibaba.dubbo.rpc.service.GenericService;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.util.Calendar;
+import java.util.Date;
+
+@Service
+@RocketMQMessageListener(
+        topic = MQConstant.TOPIC_PAY_TRANSACTION_PAY_SUCCESS,
+        consumerGroup = "pay-consumer-group-" + MQConstant.TOPIC_PAY_TRANSACTION_PAY_SUCCESS
+)
+public class PayTransactionPaySuccessConsumer implements RocketMQListener<PayTransactionPaySuccessMessage> {
+
+    @Autowired
+    private PayTransactionNotifyTaskMapper payTransactionNotifyTaskMapper;
+    @Autowired
+    private PayTransactionNotifyLogMapper payTransactionNotifyLogMapper;
+    @Autowired
+    private PayTransactionMapper payTransactionMapper;
+
+    @Override
+    @Transactional
+    public void onMessage(PayTransactionPaySuccessMessage message) {
+        // TODO 先简单写,后面重构
+
+        ApplicationConfig application = new ApplicationConfig();
+        application.setName("api-generic-consumer");
+
+        RegistryConfig registry = new RegistryConfig();
+        registry.setAddress("zookeeper://127.0.0.1:2181");
+
+        application.setRegistry(registry);
+
+        ReferenceConfig<GenericService> reference = new ReferenceConfig<>();
+        // 弱类型接口名
+        reference.setInterface("cn.iocoder.mall.pay.api.PayDemoService");
+        // 声明为泛化接口
+        reference.setGeneric(true);
+
+        reference.setApplication(application);
+
+        // 用com.alibaba.dubbo.rpc.service.GenericService可以替代所有接口引用
+        GenericService genericService = reference.get(); // TODO 芋艿,要缓存,不然重复引用
+
+        String response = null; // RPC / HTTP 调用的响应
+        PayTransactionNotifyTaskDO updateTask = new PayTransactionNotifyTaskDO() // 更新 PayTransactionNotifyTaskDO 对象
+                .setId(message.getId())
+                .setLastExecuteTime(new Date())
+                .setNotifyTimes(message.getNotifyTimes() + 1);
+        try {
+            response = (String) genericService.$invoke("updatePaySuccess", new String[]{String.class.getName()}, new Object[]{message.getOrderId()});
+            if ("success".equals(response)) { // 情况一,请求成功且返回成功
+                // 更新通知成功
+                updateTask.setStatus(PayTransactionNotifyStatusEnum.SUCCESS.getValue());
+                payTransactionNotifyTaskMapper.update(updateTask);
+                // 需要更新支付交易单通知应用成功
+                PayTransactionDO updateTransaction = new PayTransactionDO().setId(message.getTransactionId())
+                        .setFinishTime(new Date());
+                payTransactionMapper.update(updateTransaction, null);
+            } else { // 情况二,请求成功且返回失败
+                // 更新通知请求成功,但是结果失败
+                handleFailure(updateTask, PayTransactionNotifyStatusEnum.REQUEST_SUCCESS.getValue());
+                payTransactionNotifyTaskMapper.update(updateTask);
+            }
+        } catch (Throwable e) { // 请求失败
+            // 更新通知请求失败
+            response = ExceptionUtil.getRootCauseMessage(e);
+            handleFailure(updateTask, PayTransactionNotifyStatusEnum.REQUEST_FAILURE.getValue());
+            payTransactionNotifyTaskMapper.update(updateTask);
+            // 抛出异常,回滚事务
+            throw e;
+        } finally {
+            // 插入 PayTransactionNotifyLogDO 日志
+            PayTransactionNotifyLogDO notifyLog = new PayTransactionNotifyLogDO().setNotifyId(message.getId())
+                    .setRequest(message.getOrderId()).setResponse(response).setStatus(updateTask.getStatus());
+            payTransactionNotifyLogMapper.insert(notifyLog);
+        }
+    }
+
+    private void handleFailure(PayTransactionNotifyTaskDO updateTask, Integer defaultStatus) {
+        if (updateTask.getNotifyTimes() >= PayTransactionNotifyTaskDO.NOTIFY_FREQUENCY.length) {
+            updateTask.setStatus(PayTransactionNotifyStatusEnum.FAILURE.getValue());
+        } else {
+            updateTask.setNextNotifyTime(DateUtil.addDate(Calendar.SECOND, PayTransactionNotifyTaskDO.NOTIFY_FREQUENCY[updateTask.getNotifyTimes()]));
+            updateTask.setStatus(defaultStatus);
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/mq/PayTransactionPaySuccessMessage.java b/pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/mq/PayTransactionPaySuccessMessage.java
new file mode 100644
index 00000000..e83cbce9
--- /dev/null
+++ b/pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/mq/PayTransactionPaySuccessMessage.java
@@ -0,0 +1,91 @@
+package cn.iocoder.mall.pay.biz.mq;
+
+import cn.iocoder.mall.pay.biz.dataobject.PayTransactionDO;
+
+/**
+ * {@link cn.iocoder.mall.pay.biz.constant.MQConstant#TOPIC_PAY_TRANSACTION_PAY_SUCCESS} 的消息对象
+ */
+public class PayTransactionPaySuccessMessage {
+
+    /**
+     * 编号,自增
+     */
+    private Integer id;
+    /**
+     * 交易编号
+     *
+     * {@link PayTransactionDO#getId()}
+     */
+    private Integer transactionId;
+    /**
+     * 应用编号
+     */
+    private String appId;
+    /**
+     * 应用订单编号
+     */
+    private String orderId;
+    /**
+     * 当前通知次数
+     */
+    private Integer notifyTimes;
+    /**
+     * 通知地址
+     */
+    private String notifyUrl;
+
+    public Integer getId() {
+        return id;
+    }
+
+    public PayTransactionPaySuccessMessage setId(Integer id) {
+        this.id = id;
+        return this;
+    }
+
+    public String getAppId() {
+        return appId;
+    }
+
+    public PayTransactionPaySuccessMessage setAppId(String appId) {
+        this.appId = appId;
+        return this;
+    }
+
+    public String getOrderId() {
+        return orderId;
+    }
+
+    public PayTransactionPaySuccessMessage setOrderId(String orderId) {
+        this.orderId = orderId;
+        return this;
+    }
+
+    public Integer getNotifyTimes() {
+        return notifyTimes;
+    }
+
+    public PayTransactionPaySuccessMessage setNotifyTimes(Integer notifyTimes) {
+        this.notifyTimes = notifyTimes;
+        return this;
+    }
+
+    public String getNotifyUrl() {
+        return notifyUrl;
+    }
+
+    public PayTransactionPaySuccessMessage setNotifyUrl(String notifyUrl) {
+        this.notifyUrl = notifyUrl;
+        return this;
+    }
+
+    public Integer getTransactionId() {
+        return transactionId;
+    }
+
+    public PayTransactionPaySuccessMessage setTransactionId(Integer transactionId) {
+        this.transactionId = transactionId;
+        return this;
+    }
+
+}
\ No newline at end of file
diff --git a/pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/mq/package-info.java b/pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/mq/package-info.java
new file mode 100644
index 00000000..a8c2c99b
--- /dev/null
+++ b/pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/mq/package-info.java
@@ -0,0 +1 @@
+package cn.iocoder.mall.pay.biz.mq;
\ No newline at end of file
diff --git a/pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/scheduler/PayNotifyAppJob.java b/pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/scheduler/PayNotifyAppJob.java
deleted file mode 100644
index 2d0a98b1..00000000
--- a/pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/scheduler/PayNotifyAppJob.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package cn.iocoder.mall.pay.biz.scheduler;
-
-import com.xxl.job.core.biz.model.ReturnT;
-import com.xxl.job.core.handler.IJobHandler;
-import com.xxl.job.core.handler.annotation.JobHandler;
-import org.springframework.stereotype.Component;
-
-/**
- * TODO
- */
-@Component
-@JobHandler(value = "payNotifyAppJob")
-public class PayNotifyAppJob extends IJobHandler {
-
-    @Override
-    public ReturnT<String> execute(String param) throws Exception {
-        System.out.println("1");
-        return null;
-    }
-
-    // TODO 需要考虑下是基于 MQ 还是 Job
-    // TODO 通知频率
-    // TODO rpc 泛化回调
-
-}
\ No newline at end of file
diff --git a/pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/scheduler/PayTransactionNotifyJob.java b/pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/scheduler/PayTransactionNotifyJob.java
new file mode 100644
index 00000000..358ece94
--- /dev/null
+++ b/pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/scheduler/PayTransactionNotifyJob.java
@@ -0,0 +1,51 @@
+package cn.iocoder.mall.pay.biz.scheduler;
+
+import cn.iocoder.mall.pay.biz.constant.MQConstant;
+import cn.iocoder.mall.pay.biz.convert.PayTransactionConvert;
+import cn.iocoder.mall.pay.biz.dao.PayTransactionNotifyTaskMapper;
+import cn.iocoder.mall.pay.biz.dataobject.PayTransactionNotifyTaskDO;
+import com.xxl.job.core.biz.model.ReturnT;
+import com.xxl.job.core.handler.IJobHandler;
+import com.xxl.job.core.handler.annotation.JobHandler;
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+import java.util.Date;
+import java.util.List;
+
+/**
+ * 支付交易成功通知 Job
+ */
+@Component
+@JobHandler(value = "payTransactionNotifyJob")
+public class PayTransactionNotifyJob extends IJobHandler {
+
+    @Autowired
+    private PayTransactionNotifyTaskMapper payTransactionNotifyTaskMapper;
+
+    @Resource
+    private RocketMQTemplate rocketMQTemplate;
+
+    @Override
+    public ReturnT<String> execute(String param) {
+        // 获得需要通知的任务
+        List<PayTransactionNotifyTaskDO> notifyTasks = payTransactionNotifyTaskMapper.selectByNotify();
+        // 循环任务,发送通知
+        for (PayTransactionNotifyTaskDO payTransactionNotifyTask : notifyTasks) {
+            // 发送 MQ
+            rocketMQTemplate.convertAndSend(MQConstant.TOPIC_PAY_TRANSACTION_PAY_SUCCESS,
+                    PayTransactionConvert.INSTANCE.convert(payTransactionNotifyTask));
+            // 更新最后通知时间
+            // 1. 这样操作,虽然可能会出现 MQ 消费快于下面 PayTransactionNotifyTaskDO 的更新语句。但是,因为更新字段不同,所以不会有问题。
+            // 2. 换个视角,如果先更新 PayTransactionNotifyTaskDO ,再发送 MQ 消息。如果 MQ 消息发送失败,则 PayTransactionNotifyTaskDO 再也不会被轮询到了。
+            // 3. 当然,最最最完美的话,就是做事务消息,不过这样又过于复杂~
+            PayTransactionNotifyTaskDO updateNotifyTask = new PayTransactionNotifyTaskDO()
+                    .setId(payTransactionNotifyTask.getId()).setLastExecuteTime(new Date());
+            payTransactionNotifyTaskMapper.update(updateNotifyTask);
+        }
+        return new ReturnT<>("执行通知数:" + notifyTasks.size());
+    }
+
+}
\ No newline at end of file
diff --git a/pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/service/PayDemoServiceImpl.java b/pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/service/PayDemoServiceImpl.java
index 28957e51..7a03dc6c 100644
--- a/pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/service/PayDemoServiceImpl.java
+++ b/pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/service/PayDemoServiceImpl.java
@@ -9,7 +9,8 @@ public class PayDemoServiceImpl implements PayDemoService {
 
     @Override
     public String updatePaySuccess(String orderId) {
-        return "你好呀";
+//        return "你好呀";
+        return "success";
     }
 
 }
\ No newline at end of file
diff --git a/pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/service/PayServiceImpl.java b/pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/service/PayServiceImpl.java
index 21031024..614aec1d 100644
--- a/pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/service/PayServiceImpl.java
+++ b/pay/pay-service-impl/src/main/java/cn/iocoder/mall/pay/biz/service/PayServiceImpl.java
@@ -15,6 +15,7 @@ import cn.iocoder.mall.pay.api.dto.PayTransactionSubmitDTO;
 import cn.iocoder.mall.pay.biz.client.AbstractPaySDK;
 import cn.iocoder.mall.pay.biz.client.PaySDKFactory;
 import cn.iocoder.mall.pay.biz.client.TransactionPaySuccessBO;
+import cn.iocoder.mall.pay.biz.constant.MQConstant;
 import cn.iocoder.mall.pay.biz.convert.PayTransactionConvert;
 import cn.iocoder.mall.pay.biz.dao.PayTransactionExtensionMapper;
 import cn.iocoder.mall.pay.biz.dao.PayTransactionMapper;
@@ -23,12 +24,15 @@ import cn.iocoder.mall.pay.biz.dataobject.PayAppDO;
 import cn.iocoder.mall.pay.biz.dataobject.PayTransactionDO;
 import cn.iocoder.mall.pay.biz.dataobject.PayTransactionExtensionDO;
 import cn.iocoder.mall.pay.biz.dataobject.PayTransactionNotifyTaskDO;
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 
+import javax.annotation.Resource;
+import java.util.Calendar;
 import java.util.Date;
 
 @Service
@@ -46,6 +50,9 @@ public class PayServiceImpl implements PayTransactionService {
     @Autowired
     private PayAppServiceImpl payAppService;
 
+    @Resource
+    private RocketMQTemplate rocketMQTemplate;
+
     @Override
     @SuppressWarnings("Duplicates")
     public CommonResult<PayTransactionBO> createTransaction(PayTransactionCreateDTO payTransactionCreateDTO) {
@@ -158,14 +165,18 @@ public class PayServiceImpl implements PayTransactionService {
         if (updateCounts == 0) { // 校验状态,必须是待支付 TODO 这种类型,需要思考下。需要返回错误,但是又要保证事务回滚
             throw ServiceExceptionUtil.exception(PayErrorCodeEnum.PAY_TRANSACTION_STATUS_IS_NOT_WAITING.getCode());
         }
-        // 3. 插入
+        // 3.1 插入
         PayTransactionNotifyTaskDO payTransactionNotifyTask = new PayTransactionNotifyTaskDO()
                 .setTransactionId(payTransactionExtension.getTransactionId()).setTransactionExtensionId(payTransactionExtension.getId())
                 .setAppId(payTransactionDO.getAppId()).setOrderId(payTransactionDO.getOrderId())
                 .setStatus(PayTransactionNotifyStatusEnum.WAITING.getValue())
-                .setNotifyTimes(0).setMaxNotifyTimes(5)
+                .setNotifyTimes(0).setMaxNotifyTimes(PayTransactionNotifyTaskDO.NOTIFY_FREQUENCY.length + 1)
+                .setNextNotifyTime(DateUtil.addDate(Calendar.SECOND, PayTransactionNotifyTaskDO.NOTIFY_FREQUENCY[0]))
                 .setNotifyUrl(payTransactionDO.getNotifyUrl());
         payTransactionNotifyTaskMapper.insert(payTransactionNotifyTask);
+        // 3.2 发送 MQ
+        rocketMQTemplate.convertAndSend(MQConstant.TOPIC_PAY_TRANSACTION_PAY_SUCCESS,
+                PayTransactionConvert.INSTANCE.convert(payTransactionNotifyTask));
         // 返回结果
         return CommonResult.success(true);
     }
diff --git a/pay/pay-service-impl/src/main/resources/config/application.yaml b/pay/pay-service-impl/src/main/resources/config/application.yaml
index 2085220c..de2fd3e3 100644
--- a/pay/pay-service-impl/src/main/resources/config/application.yaml
+++ b/pay/pay-service-impl/src/main/resources/config/application.yaml
@@ -25,7 +25,6 @@ dubbo:
     base-packages: cn.iocoder.mall.pay.biz.service
 
 # xxl-job
-
 xxl:
   job:
     admin:
@@ -36,4 +35,10 @@ xxl:
       port: 0
       logpath: /Users/yunai/logs/xxl-job/
       logretentiondays: 1
-    accessToken:
\ No newline at end of file
+    accessToken:
+
+# rocketmq
+rocketmq:
+  name-server: 127.0.0.1:9876
+  producer:
+    group: pay-producer-group
diff --git a/pay/pay-service-impl/src/main/resources/mapper/PayTransactionMapper.xml b/pay/pay-service-impl/src/main/resources/mapper/PayTransactionMapper.xml
index 661b8c59..d280d6de 100644
--- a/pay/pay-service-impl/src/main/resources/mapper/PayTransactionMapper.xml
+++ b/pay/pay-service-impl/src/main/resources/mapper/PayTransactionMapper.xml
@@ -38,6 +38,9 @@
             <if test="entity.paymentTime != null">
                 , payment_time = #{entity.paymentTime}
             </if>
+            <if test="entity.finishTime != null">
+                , finish_time = #{entity.finishTime}
+            </if>
             <if test="entity.notifyTime != null">
                 , notify_time = #{entity.notifyTime}
             </if>
diff --git a/pay/pay-service-impl/src/main/resources/mapper/PayTransactionNotifyLogMapper.xml b/pay/pay-service-impl/src/main/resources/mapper/PayTransactionNotifyLogMapper.xml
new file mode 100644
index 00000000..6c09d697
--- /dev/null
+++ b/pay/pay-service-impl/src/main/resources/mapper/PayTransactionNotifyLogMapper.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="cn.iocoder.mall.pay.biz.dao.PayTransactionNotifyLogMapper">
+
+    <!--<sql id="FIELDS">-->
+        <!--id, transaction_id, transaction_extension_id, app_id, order_id,-->
+        <!--status, next_notify_time, last_execute_time, notify_times, max_notify_times,-->
+        <!--create_time-->
+    <!--</sql>-->
+
+    <insert id="insert" parameterType="PayTransactionNotifyLogDO" useGeneratedKeys="true" keyColumn="id" keyProperty="id">
+        INSERT INTO transaction_notify_log (
+            notify_id, request, response, status
+        ) VALUES (
+            #{notifyId}, #{request}, #{response}, #{status}
+        )
+    </insert>
+
+    <!--<update id="update" parameterType="PayTransactionNotifyTaskDO">-->
+        <!--UPDATE transaction_notify_task-->
+        <!--<set>-->
+            <!--<if test="status != null">-->
+                <!--, status = #{status}-->
+            <!--</if>-->
+            <!--<if test="nextNotifyTime != null">-->
+                <!--, last_notify_time = #{nextNotifyTime}-->
+            <!--</if>-->
+            <!--<if test="lastExecuteTime != null">-->
+                <!--, last_execute_time = #{lastExecuteTime}-->
+            <!--</if>-->
+            <!--<if test="notifyTimes != null">-->
+                <!--, notify_times = #{notifyTimes}-->
+            <!--</if>-->
+        <!--</set>-->
+        <!--WHERE id = #{id}-->
+    <!--</update>-->
+
+    <!--<select id="selectByTransactionCode" parameterType="String" resultType="PayTransactionExtensionDO">-->
+        <!--SELECT-->
+        <!--<include refid="FIELDS"/>-->
+        <!--FROM transaction_extension-->
+        <!--WHERE transaction_code = #{transactionCode}-->
+        <!--LIMIT 1-->
+    <!--</select>-->
+
+</mapper>
\ No newline at end of file
diff --git a/pay/pay-service-impl/src/main/resources/mapper/PayTransactionNotifyTaskMapper.xml b/pay/pay-service-impl/src/main/resources/mapper/PayTransactionNotifyTaskMapper.xml
index aa551842..656631c0 100644
--- a/pay/pay-service-impl/src/main/resources/mapper/PayTransactionNotifyTaskMapper.xml
+++ b/pay/pay-service-impl/src/main/resources/mapper/PayTransactionNotifyTaskMapper.xml
@@ -4,16 +4,17 @@
 
     <sql id="FIELDS">
         id, transaction_id, transaction_extension_id, app_id, order_id,
-        status, last_Notify_time, notify_times, max_notify_times, create_time
+        status, next_notify_time, last_execute_time, notify_times, max_notify_times,
+        create_time
     </sql>
 
     <insert id="insert" parameterType="PayTransactionNotifyTaskDO" useGeneratedKeys="true" keyColumn="id" keyProperty="id">
         INSERT INTO transaction_notify_task (
             transaction_id, transaction_extension_id, app_id, order_id,
-            status, last_notify_time, notify_times, max_notify_times
+            status, next_notify_time, notify_times, max_notify_times
         ) VALUES (
             #{transactionId}, #{transactionExtensionId}, #{appId}, #{orderId},
-            #{status}, #{lastNotifyTime}, #{notifyTimes}, #{maxNotifyTimes}
+            #{status}, #{nextNotifyTime}, #{notifyTimes}, #{maxNotifyTimes}
         )
     </insert>
 
@@ -23,8 +24,11 @@
             <if test="status != null">
                 , status = #{status}
             </if>
-            <if test="lastNotifyTime != null">
-                , last_notify_time = #{lastNotifyTime}
+            <if test="nextNotifyTime != null">
+                , next_notify_time = #{nextNotifyTime}
+            </if>
+            <if test="lastExecuteTime != null">
+                , last_execute_time = #{lastExecuteTime}
             </if>
             <if test="notifyTimes != null">
                 , notify_times = #{notifyTimes}
@@ -33,12 +37,13 @@
         WHERE id = #{id}
     </update>
 
-    <!--<select id="selectByTransactionCode" parameterType="String" resultType="PayTransactionExtensionDO">-->
-        <!--SELECT-->
-        <!--<include refid="FIELDS"/>-->
-        <!--FROM transaction_extension-->
-        <!--WHERE transaction_code = #{transactionCode}-->
-        <!--LIMIT 1-->
-    <!--</select>-->
+    <select id="selectByNotify" resultType="PayTransactionNotifyTaskDO">
+        SELECT
+            <include refid="FIELDS"/>
+        FROM transaction_notify_task
+        WHERE status IN (1, 3, 4, 5)
+        AND next_notify_time <![CDATA[ <= ]]> NOW()
+        AND last_execute_time > next_notify_time
+    </select>
 
 </mapper>
\ No newline at end of file
-- 
2.17.1