package com.leon.mq.rocketmq.order;import java.util.ArrayList;
import java.util.List;/*** 订单构建者*/
public class OrderStep {private long orderId;private String desc;public long getOrderId() {return orderId;}public void setOrderId(long orderId) {this.orderId = orderId;}public String getDesc() {return desc;}public void setDesc(String desc) {this.desc = desc;}@Overridepublic String toString() {return "OrderStep{" +"orderId=" + orderId +", desc='" + desc + '\'' +'}';}public static List buildOrders() {// 1039L : 创建 付款 推送 完成// 1065L : 创建 付款// 7235L :创建 付款List orderList = new ArrayList();OrderStep orderDemo = new OrderStep();orderDemo.setOrderId(1039L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(1065L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(1039L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(7235L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(1065L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(7235L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(1065L);orderDemo.setDesc("完成");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(1039L);orderDemo.setDesc("推送");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(7235L);orderDemo.setDesc("完成");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(1039L);orderDemo.setDesc("完成");orderList.add(orderDemo);return orderList;}
}
package com.leon.mq.rocketmq.order;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;import java.util.List;public class Producer {public static void main(String[] args) throws Exception {//1.创建消息生产者producer,并制定生产者组名DefaultMQProducer producer = new DefaultMQProducer("group1");//2.指定Nameserver地址producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");//3.启动producerproducer.start();//构建消息集合List orderSteps = OrderStep.buildOrders();//发送消息for (int i = 0; i < orderSteps.size(); i++) {String body = orderSteps.get(i) + "";Message message = new Message("OrderTopic", "Order", "i" + i, body.getBytes());/*** 参数一:消息对象* 参数二:消息队列的选择器* 参数三:选择队列的业务标识(订单ID)*/SendResult sendResult = producer.send(message, new MessageQueueSelector() {/**** @param mqs:队列集合* @param msg:消息对象* @param arg:业务标识的参数* @return*/@Overridepublic MessageQueue select(List mqs, Message msg, Object arg) {long orderId = (long) arg;long index = orderId % mqs.size();return mqs.get((int) index);}}, orderSteps.get(i).getOrderId());System.out.println("发送结果:" + sendResult);}producer.shutdown();}}
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!