目录
RocketMq消息处理1. 处理PULL_MESSAGE请求2. 获取消息3. 挂起请求:PullRequestHoldService#suspendPullRequest3.1 处理挂起请求的线程:PullRequestHoldService3.2 唤醒请求:PullMessageProcessor#executeRequestWhenWakeup3.3 消息分发中唤醒consumer请求总结RocketMq消息处理
RocketMq
消息处理整个流程如下:
(资料图片仅供参考)
本系列RocketMQ4.8注释github地址,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈
消息接收:消息接收是指接收producer
的消息,处理类是SendMessageProcessor
,将消息写入到commigLog
文件后,接收流程处理完毕;消息分发:broker
处理消息分发的类是ReputMessageService
,它会启动一个线程,不断地将commitLong
分到到对应的consumerQueue
,这一步操作会写两个文件:consumerQueue
与indexFile
,写入后,消息分发流程处理 完毕;消息投递:消息投递是指将消息发往consumer
的流程,consumer
会发起获取消息的请求,broker
收到请求后,调用PullMessageProcessor
类处理,从consumerQueue
文件获取消息,返回给consumer
后,投递流程处理完毕。
以上就是rocketMq
处理消息的流程了,接下来我们就从源码来分析消息投递的实现。
1. 处理PULL_MESSAGE请求
与producer
不同,consumer
从broker
拉取消息时,发送的请求code
为PULL_MESSAGE
,processor
为PullMessageProcessor
,我们直接进入它的processRequest
方法:
@Override public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { // 调用方法 return this.processRequest(ctx.channel(), request, true); }
这个方法就只是调用了一个重载方法,多出来的参数true
表示允许broker
挂起请求,我们继续,
/** * 继续处理 */ private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)throws RemotingCommandException { RemotingCommand response = RemotingCommand .createResponseCommand(PullMessageResponseHeader.class); final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader(); final PullMessageRequestHeader requestHeader = (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class); response.setOpaque(request.getOpaque()); // 省略权限校验流程 // 1. rocketMq 可以设置校验信息,以阻挡非法客户端的连接 // 2. 同时,对topic可以设置DENY(拒绝)、ANY(PUB 或者 SUB 权限)、PUB(发送权限)、SUB(订阅权限)等权限, // 可以细粒度控制客户端对topic的操作内容 ... // 获取订阅组 SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager() .findSubscriptionGroupConfig(requestHeader.getConsumerGroup()); ... // 获取订阅主题 TopicConfig topicConfig = this.brokerController.getTopicConfigManager() .selectTopicConfig(requestHeader.getTopic()); ... // 处理filter // consumer在订阅消息时,可以对订阅的消息进行过滤,过滤方法有两种:tag与sql92 // 这里我们重点关注拉取消息的流程,具体的过滤细节后面再分析 ... // 获取消息 // 1. 根据 topic 与 queueId 获取 ConsumerQueue 文件 // 2. 根据 ConsumerQueue 文件的信息,从 CommitLog 中获取消息内容 final GetMessageResult getMessageResult = this.brokerController.getMessageStore().getMessage( requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter); if (getMessageResult != null) { // 省略一大堆的校验过程 ... switch (response.getCode()) { // 表示消息可以处理,这里会把消息内容写入到 response 中 case ResponseCode.SUCCESS: ... // 处理消息消息内容,就是把消息从 getMessageResult 读出来,放到 response 中 if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) { final long beginTimeMills = this.brokerController.getMessageStore().now(); // 将消息内容转为byte数组 final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId()); ... response.setBody(r); } else { try { // 消息转换 FileRegion fileRegion = new ManyMessageTransfer(response.encodeHeader( getMessageResult.getBufferTotalSize()), getMessageResult); channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() { ... }); } catch (Throwable e) { ... } response = null; } break; // 未找到满足条件的消息 case ResponseCode.PULL_NOT_FOUND: // 如果支持挂起,就挂起当前请求 if (brokerAllowSuspend && hasSuspendFlag) { ... PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills, this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter); // 没有找到相关的消息,挂起操作 this.brokerController.getPullRequestHoldService() .suspendPullRequest(topic, queueId, pullRequest); response = null; break; } // 省略其他类型的处理 ... break; default: assert false; } } else { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("store getMessage return null"); } ... return response; }
在源码中,这个方法也是非常长,这里我抹去了各种细枝末节,仅留下了一些重要的流程,整个处理流程如下:
权限校验:rocketMq
可以设置校验信息,以阻挡非法客户端的连接,同时也可以设置客户端的发布、订阅权限,细节度控制访问权限;获取订阅组、订阅主题等,这块主要是通过请求消息里的内容获取broker
中对应的记录创建过滤组件:consumer
在订阅消息时,可以对订阅的消息进行过滤,过滤方法有两种:tag
与sql92
获取消息:先是根据 topic
与 queueId
获取 ConsumerQueue
文件,根据 ConsumerQueue
文件的信息,从 CommitLog
中获取消息内容,消息的过滤操作也是发生在这一步转换消息:如果获得了消息,就是把具体的消息内容,复制到reponse
中挂起请求:如果没获得消息,而当前请求又支持挂起,就挂起当前请求
以上代码还是比较清晰的,相关流程代码中都作了注释。
以上流程就是整个消息的获取流程了,在本文中,我们仅关注与获取消息相关的步骤,重点关注以下两个操作:
获取消息挂起请求2. 获取消息
获取消息的方法为DefaultMessageStore#getMessage
,代码如下:
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, final int maxMsgNums, final MessageFilter messageFilter) { // 省略一些判断 ... // 根据topic与queueId一个ConsumeQueue,consumeQueue记录的是消息在commitLog的位置 ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId); if (consumeQueue != null) { minOffset = consumeQueue.getMinOffsetInQueue(); maxOffset = consumeQueue.getMaxOffsetInQueue(); if (...) { // 判断 offset 是否符合要求 ... } else { // 从 consumerQueue 文件中获取消息 SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset); if (bufferConsumeQueue != null) { ... for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { // 省略一大堆的消息过滤操作 ... // 从 commitLong 获取消息 SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy); if (null == selectResult) { if (getResult.getBufferTotalSize() == 0) { status = GetMessageStatus.MESSAGE_WAS_REMOVING; } nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy); continue; } // 省略一大堆的消息过滤操作 ... } } } else { status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE; nextBeginOffset = nextOffsetCorrection(offset, 0); } if (GetMessageStatus.FOUND == status) { this.storeStatsService.getGetMessageTimesTotalFound().incrementAndGet(); } else { this.storeStatsService.getGetMessageTimesTotalMiss().incrementAndGet(); } long elapsedTime = this.getSystemClock().now() - beginTime; this.storeStatsService.setGetMessageEntireTimeMax(elapsedTime); getResult.setStatus(status); // 又是处理 offset getResult.setNextBeginOffset(nextBeginOffset); getResult.setMaxOffset(maxOffset); getResult.setMinOffset(minOffset); return getResult; }
这个方法不是比较长的,这里仅保留了关键流程,获取消息的关键流程如下:
根据topic
与queueId
找到ConsumerQueue
从ConsumerQueue
对应的文件中获取消息信息,如tag
的hashCode
、消息在commitLog
中的位置信息根据位置信息,从commitLog
中获取完整的消息
经过以上步骤,消息就能获取到了,不过在获取消息的前后,会进行消息过滤操作,即根据tag
或sql
语法来过滤消息,关于消息过滤的一些细节,我们留到后面消息过滤相关章节作进一步分析。
3. 挂起请求:PullRequestHoldService#suspendPullRequest
当broker
无新消息时,consumer
拉取消息的请求就会挂起,方法为PullRequestHoldService#suspendPullRequest
:
public class PullRequestHoldService extends ServiceThread { private ConcurrentMappullRequestTable = new ConcurrentHashMap (1024); public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) { String key = this.buildKey(topic, queueId); ManyPullRequest mpr = this.pullRequestTable.get(key); if (null == mpr) { mpr = new ManyPullRequest(); ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr); if (prev != null) { mpr = prev; } } mpr.addPullRequest(pullRequest); } ... }
在suspendPullRequest
方法中,所做的工作仅是把当前请求放入pullRequestTable
中了。从代码中可以看到,pullRequestTable
是一个ConcurrentMap
,key
是 topic@queueId
,value
就是挂起的请求了。
请求挂起后,何时处理呢?这就是PullRequestHoldService
线程的工作了。
3.1 处理挂起请求的线程:PullRequestHoldService
看完PullRequestHoldService#suspendPullRequest
方法后,我们再来看看PullRequestHoldService
。
PullRequestHoldService
是ServiceThread
的子类(上一次看到ServiceThread
的子类还是ReputMessageService
),它也会启动一个新线程来处理挂起操作。
我们先来看看它是在哪里启动PullRequestHoldService
的线程的,在BrokerController
的启动方法start()
中有这么一行:
BrokerController#start
public void start() throws Exception { ... if (this.pullRequestHoldService != null) { this.pullRequestHoldService.start(); } ... }
这里就是启动pullRequestHoldService
的线程操作了。
为了探究这个线程做了什么,我们进入PullRequestHoldService#run
方法:
@Override public void run() { log.info("{} service started", this.getServiceName()); while (!this.isStopped()) { try { // 等待中 if (this.brokerController.getBrokerConfig().isLongPollingEnable()) { this.waitForRunning(5 * 1000); } else { this.waitForRunning( this.brokerController.getBrokerConfig().getShortPollingTimeMills()); } long beginLockTimestamp = this.systemClock.now(); // 检查操作 this.checkHoldRequest(); long costTime = this.systemClock.now() - beginLockTimestamp; if (costTime > 5 * 1000) { log.info("[NOTIFYME] check hold request cost {} ms.", costTime); } } catch (Throwable e) { log.warn(this.getServiceName() + " service has exception. ", e); } } log.info("{} service end", this.getServiceName()); }
从代码来看,这个线程先是进行等待,然后调用PullRequestHoldService#checkHoldRequest
方法,看来关注就是这个方法了,它的代码如下:
private void checkHoldRequest() { for (String key : this.pullRequestTable.keySet()) { String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR); if (2 == kArray.length) { String topic = kArray[0]; int queueId = Integer.parseInt(kArray[1]); final long offset = this.brokerController.getMessageStore() .getMaxOffsetInQueue(topic, queueId); try { // 调用notifyMessageArriving方法操作 this.notifyMessageArriving(topic, queueId, offset); } catch (Throwable e) { log.error(...); } } } }
这个方法调用了PullRequestHoldService#notifyMessageArriving(...)
,我们继续进入:
public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset) { // 继续调用 notifyMessageArriving(topic, queueId, maxOffset, null, 0, null, null); } /** * 这个方法就是最终调用的了 */ public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode, long msgStoreTime, byte[] filterBitMap, Mapproperties) { String key = this.buildKey(topic, queueId); ManyPullRequest mpr = this.pullRequestTable.get(key); if (mpr != null) { List requestList = mpr.cloneListAndClear(); if (requestList != null) { List replayList = new ArrayList (); for (PullRequest request : requestList) { // 判断是否有新消息到达,要根据 comsumerQueue 的偏移量与request的偏移量判断 long newestOffset = maxOffset; if (newestOffset <= request.getPullFromThisOffset()) { newestOffset = this.brokerController.getMessageStore() .getMaxOffsetInQueue(topic, queueId); } if (newestOffset > request.getPullFromThisOffset()) { boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode, new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap)); if (match && properties != null) { match = request.getMessageFilter().isMatchedByCommitLog(null, properties); } if (match) { try { // 唤醒操作 this.brokerController.getPullMessageProcessor() .executeRequestWhenWakeup(request.getClientChannel(), request.getRequestCommand()); } catch (Throwable e) { log.error("execute request when wakeup failed.", e); } continue; } } // 超时时间到了 if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) { try { // 唤醒操作 this.brokerController.getPullMessageProcessor() .executeRequestWhenWakeup(request.getClientChannel(), request.getRequestCommand()); } catch (Throwable e) { log.error("execute request when wakeup failed.", e); } continue; } replayList.add(request); } if (!replayList.isEmpty()) { mpr.addPullRequest(replayList); } } } }
这个方法就是用来检查是否有新消息送达的操作了,方法虽然有点长,但可以用一句话来总结:如果有新消息送达,或者pullRquest
hold
住的时间到了,就唤醒pullRquest
(即调用PullMessageProcessor#executeRequestWhenWakeup
方法)。
comsumerQueue
文件中的最大偏移量,与当前pullRquest
中的偏移量进行比较,如果前者大,就表示有新消息送达了,需要唤醒pullRquest
前面说过,当consumer
请求没获取到消息时,broker
会hold
这个请求一段时间(30s),当这个时间到了,也会唤醒pullRquest
,之后就不会再hold
住它了
3.2 唤醒请求:PullMessageProcessor#executeRequestWhenWakeup
我们再来看看 PullMessageProcessor#executeRequestWhenWakeup
方法:
public void executeRequestWhenWakeup(final Channel channel, final RemotingCommand request) throws RemotingCommandException { // 关注 Runnable#run() 方法即可 Runnable run = new Runnable() { @Override public void run() { try { // 再一次调用 PullMessageProcessor#processRequest(...) 方法 final RemotingCommand response = PullMessageProcessor.this .processRequest(channel, request, false); ... } catch (RemotingCommandException e1) { log.error("excuteRequestWhenWakeup run", e1); } } }; // 提交任务 this.brokerController.getPullMessageExecutor() .submit(new RequestTask(run, channel, request)); }
这个方法准备了一个任务,然后将其提交到线程池中执行,任务内容很简单,仅是调用了PullMessageProcessor#processRequest(...)
方法,这个方法就是本节一始提到的处理consumer
拉取消息的方法了。
3.3 消息分发中唤醒consumer请求
在分析消息分发流程时,DefaultMessageStore.ReputMessageService#doReput
方法中有这么一段:
private void doReput() { ... // 分发消息 DefaultMessageStore.this.doDispatch(dispatchRequest); // 长轮询:如果有消息到了主节点,并且开启了长轮询 if (BrokerRole.SLAVE != DefaultMessageStore.this .getMessageStoreConfig().getBrokerRole() &&DefaultMessageStore.this.brokerConfig.isLongPollingEnable()){ // 调用NotifyMessageArrivingListener的arriving方法 DefaultMessageStore.this.messageArrivingListener.arriving( dispatchRequest.getTopic(), dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1, dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(), dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap()); } ... }
这段就是用来主动唤醒hold
住的consumer
请求的,我们进入NotifyMessageArrivingListener#arriving
方法:
@Override public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime, byte[] filterBitMap, Mapproperties) { this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties); }
最终它也是调用了 PullRequestHoldService#notifyMessageArriving(...)
方法。
总结
本文主要分析了broker
处理PULL_MESSAGE
请求的流程,总结如下:
broker
处理PULL_MESSAGE
的processor
为PullMessageProcessor
,PullMessageProcessor
的processRequest(...)
就是整个消息获取流程了broker
在获取消息时,先根据请求的topic
与queueId
找到consumerQueue
,然后根据请求中的offset
参数从consumerQueue
文件中找到消息在commitLog
的位置信息,最后根据位置信息从commitLog
中获取消息内容如果broker
中没有当前consumerQueue
的消息,broker
会挂起当前线程,直到超时(默认30s)或收到新的消息时再唤醒
参考
RocketMQ源码分析专栏
以上就是RocketMQ broker 消息投递流程处理PULL_MESSAGE请求解析的详细内容,更多关于RocketMQ broker 消息投递的资料请关注脚本之家其它相关文章!
-
全球速读:RocketMQ broker 消息投递流程处理PULL_MESSAGE请求解析这篇文章主要为大家介绍了RocketMQ broker 消息投递流程处理PULL_MESSAGE请求源码解析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝
-
快速美白牙齿小妙招_日常美白小妙招1、 对于肌肤美白是许多女性朋友不变的追求,大家知道我们日常可以怎么美白吗?下面小编为大家整理了日常美白小妙招,欢迎阅
-
地方一季度发债已超2万亿元,钱花哪儿了?|环球速读为了稳经济,一季度地方政府举债已经超过2万亿元。目前,地方政府唯一合法举债渠道是发行地方政府债券。根据公开数据,今年一季
-
星星的礼物第3季_视点星星的礼物第3季,星云:星上有星星在水底的星点缀一些许多语,星上面夹杂志的是星巴,星上面写着“妈妈的爱”。”我们的感谢”
-
全球微头条丨“五个一百”里看乡村振兴“兴农人”团结奋进新征程,同心奋斗创伟业。“五个一百”自开展以来,让更多“兴农人”的正能量故事奔涌而出。有组建“魔法实验室”的90后海归用科技...
-
不降价反倒卖得更好?新势力3月销量排行出炉,理想、蔚来夺冠亚|世界关注原本以为3月份的降价潮会带动汽车市场消费,但从头部新势力品牌公布的销量成绩来看,降价并没有带来销量上的增长,反倒是部分坚持不降价的新势
-
每日消息!深天马A:两子公司共获政府补助资金1.78亿元深天马A4月3日公告,近日,厦门天马收到厦门火炬高技术产业开发区管理委员会《关于拨付厦门天马微电子有限公司研发支持补贴资
-
儿童型抑郁症怎么治疗 儿童型抑郁症会自愈吗?儿童型抑郁症怎么治疗儿童抑郁症一般是指抑郁症发生在6岁到12岁的儿童。患儿表现没有以前高兴了,对什么也提不起兴趣,言语和活动减少,常感没
-
一季度西部陆海新通道铁海联运班列发送集装箱货物量同比增长11.7% 天天热点记者从中国铁路南宁局集团有限公司获悉,今年1月至3月,西部陆海新通道铁海联运班列发送集装箱货物19 1万标箱,同比增长11 7%。
-
4月3日东明石化硫磺报价暂稳-世界聚看点4月3日,山东东明石化集团硫磺报价暂稳,固体硫磺报价在1050元 吨,液体硫磺价格在1000元 吨,4月2日企业调价,固液体硫磺同步下调50元 吨,目
-
身材管理风潮拉动代餐市场 紫薯切入代餐赛道掘金-当前看点随着生活水平提高,人们越发注重健康饮食和身材管理,正所谓“春日不减肥,夏日徒悲伤”,随着夏天的脚步临近,许多爱美人士,开启减重之路...
-
北股交科技创新板“纳新”记“挂牌北股交、成为科技创新板企业,是我们迈向资本市场的第一步。”刚完成PreA轮融资的北京中科海势科技有限公司(中科海势
-
枪击电池?埃安弹匣电池2.0的安全测试从针刺到枪击,广汽埃安换着法子来折磨车底下的电池包,对此,我举双手赞成,电池包在试验室里被虐的越狠,在我们屁股底下的时候也就越安分。
-
醴陵:集思广益,打响“理响瓷城”宣讲品牌-环球新消息红网时刻新闻4月3日讯(通讯员 阳慧 王力鑫)3月29日,醴陵召开“
-
25分大胜!浓眉40+9老詹三双,替补20+12早该被重用,哈姆2事该批 世界快资讯25分大胜!浓眉40+9老詹三双,替补20+12早该被重用,哈姆2事该批,湖人,火箭,老詹,里夫斯,瑞典足球,米娅·哈姆,勒布朗詹姆斯,浓眉40+9,国际足球
-
[异常波动]净源科技(836674):股票交易异常波动公告|环球简讯证券代码:836674证券简称:净源科技主办券商:甬兴证券浙江净源膜科技股份有限公司股票交易异常波动公告本公司及董事会全体
-
河南漯河消协发布警示:慎防玻尿酸注射风险针对当下消费者注射玻尿酸后屡屡出现美容事故的风险,3月28日,河南省漯河市消费者协会发布警示,提醒消费者慎防玻尿酸注射的美容风险。玻尿酸
-
“天大·海棠季”校园开放日举行 8万人共赴天大“海棠之约”天津北方网讯:海棠花开,一簇簇一树树的胭脂绯红;校园里,朝阳般的笑容绽放开来。阔别多年的校友、曾把孩子送至校园门口不得不转身离开的家
-
中金:稀土总量控制指标落地 稀土配置价值有望回归【中金:稀土总量控制指标落地稀土配置价值有望回归】3月24日,工信部和自然资源部下达2023年第一批稀土开采、冶炼分离总量控制指标,其中矿产
-
三省五地旅游一体化活动启动|天天百事通3月31日,三省五地旅游一体化暨“金武张海阿”文旅惠民活动启动仪式在金昌举行。现场正式发布了金武张海阿文旅惠民卡,该卡已签约五个市州...
-
当前要闻:2022年公募整体亏损1.45万亿元截至2023年3月31日,公募基金2022年年报披露完毕。天相统计数据显示,在市场行情跌宕起伏的2022年,公募基金整体亏损达1 45万亿元左右,其中权
-
农业银行行长付万军:强化定价精细化管理,努力稳住净息差水平 焦点快报每经记者张寿林每经编辑廖丹3月31日下午,中国农业银行(601288)召开业绩发布会。根据农业银行2022年业绩报告,2
-
比赛日信息:国际米兰vs佛罗伦萨(意甲联赛第28...比赛日信息:国际米兰vs佛罗伦萨(意甲联赛第28轮)比赛时间:4月2日0点比赛直播:咪咕视频(不要银子)比赛数据:双方近13场各项赛事交手,国
-
孟晚舟首次成为华为最高领袖:我们向死而生,怎么能不成仁?【文 观察者网吕栋】“身处暴风雨中”、“努力保障业务连续”、“经营依然面临较大压力” 3月31日下午,两位华
-
全球快资讯丨西媒:F-门迪的伤病体质令队医困惑,皇马考虑今夏出售直播吧4月1日讯皇马官方在此前宣布边后卫F-门迪遭遇左腿比目鱼肌伤病,西媒《relevo》透露皇马已经对他失去耐心。每当门迪受伤,皇马队医们都
-
在美陷关停风险,SHEIN的麻烦大了_环球播报作者|麻吉编辑|宋函中国出海APP正在迎接一场新的风暴。3月26日,有消息称中国跨境快时尚品牌Shein或面临在美国被关闭风险。美国民间机构ShutD
-
吕蒙字子明汝南富陂人也文言文翻译_吕蒙字子明汝南富陂人也翻译1、吕蒙字子明汝南富波人也阅读答案及译文吕蒙字子明,汝南富波人也。2、少南渡,依妹夫邓当。3、当为孙策将,数讨山
-
怎样做蛋糕_怎样制作蛋糕|今日要闻1、市或者淘宝上买蛋糕粉,每次用一小袋大约五十克,加一个蛋两匙油(用黄油最香,别的植物油色拉油花生油也是可以的)搅拌成糊
-
钼精矿价格跌至3000元/吨度-天天滚动数据显示,2月中下旬,国内钼精矿价格一度到达5500 吨度以上,创下了至少15年的新高
-
江苏徐州:铁路安全进校园_世界快播报江苏徐州:铁路安全进校园