问题描述
是否可以模拟 Eventbridge 模式匹配,通过将模式匹配的消息接收到 SQS 队列来确认?
问我有一个 boto3/moto 脚本(下面),它试图做到这一点,我相信它应该工作,但..它不是:/
我认为这可能属于“复杂的模拟 AWS 工作流程在云中的行为从不完全像他们在云中所做的那样”的标题,但我可能在某处犯了错误。
有人知道以这种方式测试 Eventbridge 模式匹配是否可行(或不可行),或者我做错了什么?
TIA
import boto3,json,unittest
from datetime import datetime
from moto import mock_events,mock_sqs
QueueName,RuleName = "hello-queue","hello-rule"
RuleTargetId,RuleEnabled = "hello-rule-target","ENABLED"
EventSource,EventDetailType,EventResource = "hello-event-source","hello-event-detail-type","event-resource"
@mock_events
@mock_sqs
class HelloTest(unittest.TestCase):
def setUp(self):
self.sqs,self.events = (boto3.client("sqs"),boto3.client("events"))
statement=[{"Effect": "Allow","Principal": {"Service": "events.amazonaws.com"},"Action": "sqs:SendMessage","Resource": "*"}]
policy=json.dumps({"Version": "2012-10-17","Statement": [statement]})
queue=self.sqs.create_queue(QueueName=QueueName,Attributes={"Policy": policy})
self.queueurl=queue["QueueUrl"]
queueattrs=self.sqs.get_queue_attributes(QueueUrl=self.queueurl)
self.queuearn=queueattrs["Attributes"]["QueueArn"]
eventpattern={"source": [EventSource],"detail": {"hello": ["world"]}}
self.rule=self.events.put_rule(Name=RuleName,State=RuleEnabled,EventPattern=json.dumps(eventpattern))
self.events.put_targets(Rule=RuleName,Targets=[{"Id": RuleTargetId,"Arn": self.queuearn}])
# test that simple SQS send/receive works in moto
def test_sqs_message(self):
reqbody=json.dumps({"hello": "world"})
self.sqs.send_message(QueueUrl=self.queueurl,MessageBody=reqbody)
sqsresp=self.sqs.receive_message(QueueUrl=self.queueurl)
self.assertTrue("Messages" in sqsresp)
messages=sqsresp["Messages"]
self.assertTrue(len(messages),1)
message=messages.pop()
body=json.loads(message["Body"])
self.assertTrue("hello" in body)
# test that more complex Eventbridge+SQS workflow works
def test_eventbridge_message(self):
detail=json.dumps({"hello": "world"})
entry={"Time": datetime.utcNow().strftime("%Y-%m-%dT%H:%M:%sZ"),"Source": EventSource,"Resources": [EventResource],"DetailType": EventDetailType,"Detail": detail}
self.events.put_events(Entries=[entry])
sqsresp=self.sqs.receive_message(QueueUrl=self.queueurl)
# self.assertTrue("Messages" in sqsresp) # <--- FAILS HERE
def tearDown(self):
self.sqs.delete_queue(QueueUrl=self.queueurl)
self.events.remove_targets(Rule=RuleName,Ids=[RuleTargetId])
self.events.delete_rule(Name=RuleName)
if __name__=="__main__":
unittest.main()
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)