与 moto 匹配的模拟 Eventbridge 事件模式

问题描述

是否可以模拟 Eventbridge 模式匹配,通过将模式匹配的消息接收到 SQS 队列来确认?

我有一个 boto3/moto 脚本(下面),它试图做到这一点,我相信它应该工作,但..它不是:/

认为这可能属于“复杂的模拟 AWS 工作流程在云中的行为从不完全像他们在云中所做的那样”的标题,但我可能在某处犯了错误

有人知道以这种方式测试 Eventbridge 模式匹配是否可行(或不可行),或者我做错了什么?

TIA

import boto3,json,unittest

from datetime import datetime

from moto import mock_events,mock_sqs

QueueName,RuleName = "hello-queue","hello-rule"

RuleTargetId,RuleEnabled = "hello-rule-target","ENABLED"

EventSource,EventDetailType,EventResource = "hello-event-source","hello-event-detail-type","event-resource"

@mock_events
@mock_sqs
class HelloTest(unittest.TestCase):
    
    def setUp(self):
        self.sqs,self.events = (boto3.client("sqs"),boto3.client("events"))
        statement=[{"Effect": "Allow","Principal": {"Service": "events.amazonaws.com"},"Action": "sqs:SendMessage","Resource": "*"}]
        policy=json.dumps({"Version": "2012-10-17","Statement": [statement]})
        queue=self.sqs.create_queue(QueueName=QueueName,Attributes={"Policy": policy})
        self.queueurl=queue["QueueUrl"]
        queueattrs=self.sqs.get_queue_attributes(QueueUrl=self.queueurl)
        self.queuearn=queueattrs["Attributes"]["QueueArn"]
        eventpattern={"source": [EventSource],"detail": {"hello": ["world"]}}
        self.rule=self.events.put_rule(Name=RuleName,State=RuleEnabled,EventPattern=json.dumps(eventpattern))
        self.events.put_targets(Rule=RuleName,Targets=[{"Id": RuleTargetId,"Arn": self.queuearn}])

    # test that simple SQS send/receive works in moto
        
    def test_sqs_message(self):
        reqbody=json.dumps({"hello": "world"})
        self.sqs.send_message(QueueUrl=self.queueurl,MessageBody=reqbody)
        sqsresp=self.sqs.receive_message(QueueUrl=self.queueurl)
        self.assertTrue("Messages" in sqsresp)
        messages=sqsresp["Messages"]
        self.assertTrue(len(messages),1)
        message=messages.pop()
        body=json.loads(message["Body"])
        self.assertTrue("hello" in body)

    # test that more complex Eventbridge+SQS workflow works
        
    def test_eventbridge_message(self):
        detail=json.dumps({"hello": "world"})
        entry={"Time": datetime.utcNow().strftime("%Y-%m-%dT%H:%M:%sZ"),"Source": EventSource,"Resources": [EventResource],"DetailType": EventDetailType,"Detail": detail}
        self.events.put_events(Entries=[entry])
        sqsresp=self.sqs.receive_message(QueueUrl=self.queueurl)
        # self.assertTrue("Messages" in sqsresp) # <--- FAILS HERE
  
    def tearDown(self):
        self.sqs.delete_queue(QueueUrl=self.queueurl)
        self.events.remove_targets(Rule=RuleName,Ids=[RuleTargetId])
        self.events.delete_rule(Name=RuleName)
        
if __name__=="__main__":
    unittest.main()

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)