问题描述
我在下面使用查询。我正在使用事件中心从推送API提取json格式的消息。 Json msg中有一些数组。现在,我收到类似“我们无法连接到事件中心分区[0]的错误,因为已达到使用者组中每个分区允许的最大接收者数量。请确保其他Stream Analytics作业或Service Bus Explorer没有使用相同的对象。消费者组。以下信息可能有助于识别已连接的接收方:超出消费者组中每个分区允许的最大接收方数5“。
我希望输出到Azure表存储,只要查询正常运行即可。
在它不是那么复杂的时候我就可以使用它,但是现在看来我无法将所有需要的结果加入到主要的serviceProblem输出中。有时查询运行成功,但是某些输出为空,尽管不应如此。我应该将查询划分给更多的ASA作业,还是应该通过另一个事件中心推送一些输出以避免错误? 谢谢您的帮助。
with OrigQ as (
select
*
from tmfsnowgstm Partition BY PartitionId --INTO 4
where substring(tmfsnowgstm.eventType,1,14) = 'serviceProblem'
),ReaderQuery as (
select
PartitionId,event.serviceProblem.*
from OrigQ Partition BY PartitionId --INTO 4
),comment as (
SELECT
'comment' as partitionKey,concat(rq.id,'-',comment.ArrayValue.date,comment.ArrayValue.[@type]) as rowKey,rq.PartitionId,rq.id as ServiceProblemId,comment.ArrayValue.date as date,comment.ArrayValue.system as system,comment.ArrayValue.author as author,comment.ArrayValue.[@type] as type,comment.ArrayValue.text as text
from ReaderQuery rq Partition BY PartitionId --INTO 4
CROSS APPLY GetArrayElements(rq.comment) AS comment
),trackingRecord as (
select
rq.PartitionId,rq.id,trackingRecord.ArrayValue.extensionInfo
from ReaderQuery rq Partition BY PartitionId --INTO 4
CROSS APPLY GetArrayElements(rq.trackingRecord) AS trackingRecord
),trackingRecordDtls as (
SELECT
trackingRecord.PartitionId,trackingRecord.id as id,extensionInfo.ArrayValue.name as name,extensionInfo.ArrayValue.value as value
from trackingRecord Partition BY PartitionId --INTO 4
CROSS APPLY GetArrayElements(extensionInfo) AS extensionInfo
),relatedParty as (
SELECT
rq.PartitionId,rq.id as id,relatedParty.ArrayValue.role as name,relatedParty.ArrayValue.name as value
from ReaderQuery rq Partition BY PartitionId --INTO 4
CROSS APPLY GetArrayElements(rq.relatedParty) AS relatedParty
),relatedPartyDtls as (
select
rq.PartitionId,rqp.value as requestedBy,cal.value as caller,atg.value as assignedToGroup,onob.value as ownerNocOpenedBy,onc.value as ownerNoc,ast.value as assignedTo
from ReaderQuery rq Partition BY PartitionId --INTO 4
LEFT JOIN relatedParty rqp ON rqp.name = 'requestedBy' and rqp.id = rq.id AND DATEDIFF(minute,rq,rqp) BETWEEN 0 AND 0 and rq.PartitionId = rqp.PartitionId
LEFT JOIN relatedParty cal ON cal.name = 'caller' and cal.id = rq.id AND DATEDIFF(minute,cal) BETWEEN 0 AND 0 and rq.PartitionId = cal.PartitionId
LEFT JOIN relatedParty atg ON atg.name = 'assignedToGroup' and atg.id = rq.id AND DATEDIFF(minute,atg) BETWEEN 0 AND 0 and rq.PartitionId = atg.PartitionId
LEFT JOIN relatedParty onob ON onob.name = 'ownerNocOpenedBy,openedBy' and onob.id = rq.id AND DATEDIFF(minute,onob) BETWEEN 0 AND 0 and rq.PartitionId = onob.PartitionId
LEFT JOIN relatedParty ast ON ast.name = 'assignedTo' and ast.id = rq.id AND DATEDIFF(minute,ast) BETWEEN 0 AND 0 and rq.PartitionId = ast.PartitionId
LEFT JOIN relatedParty onc ON onc.name = 'ownerNoc' and onc.id = rq.id AND DATEDIFF(minute,onc) BETWEEN 0 AND 0 and rq.PartitionId = onc.PartitionId
),extensionInfo as (
SELECT
rq.PartitionId,extensionInfo.ArrayValue.value as value
from ReaderQuery rq Partition BY PartitionId --INTO 4
CROSS APPLY GetArrayElements(rq.extensionInfo) AS extensionInfo
),extensionInfoDtls3 as (
select
rq.PartitionId,rq.requestedBy,rq.caller,rq.assignedToGroup,rq.ownerNocOpenedBy,rq.ownerNoc,rq.assignedTo,cc.value as closureCode,ooi.value as originOfIssue,rec.value as resolutionCode,rc.value as rootCause,src.value as subRootCause
from relatedPartyDtls rq Partition BY PartitionId --INTO 4
LEFT JOIN trackingRecordDtls cc ON cc.name = 'closureCode' and cc.id = rq.id AND DATEDIFF(minute,cc) BETWEEN 0 AND 0 and rq.PartitionId = cc.PartitionId
LEFT JOIN trackingRecordDtls ooi ON ooi.name = 'originOfIssue' and ooi.id = rq.id AND DATEDIFF(minute,ooi) BETWEEN 0 AND 0 and rq.PartitionId = ooi.PartitionId
LEFT JOIN trackingRecordDtls rec ON rec.name = 'resolutionCode' and rec.id = rq.id AND DATEDIFF(minute,rec) BETWEEN 0 AND 0 and rq.PartitionId = rec.PartitionId
LEFT JOIN trackingRecordDtls rc ON rc.name = 'rootCause' and rc.id = rq.id AND DATEDIFF(minute,rc) BETWEEN 0 AND 0 and rq.PartitionId = rc.PartitionId
LEFT JOIN trackingRecordDtls src ON src.name = 'subRootCause' and src.id = rq.id AND DATEDIFF(minute,src) BETWEEN 0 AND 0 and rq.PartitionId = src.PartitionId
),extensionInfoDtls1 as (
select
rq.PartitionId,rq.closureCode,rq.originOfIssue,rq.resolutionCode,rq.rootCause,rq.subRootCause,wrif.value as weatherRelatedIssueFlag,ncf.value as notifyCustomerFlag,mdu.value as monitoringDuration,cot.value as contactType,cpr.value as calculatedPriority
from extensionInfoDtls3 rq Partition BY PartitionId --INTO 4
LEFT JOIN extensionInfo wrif ON wrif.name = 'weatherRelatedIssueFlag' and wrif.id = rq.id AND DATEDIFF(minute,wrif) BETWEEN 0 AND 0 and rq.PartitionId = wrif.PartitionId
LEFT JOIN extensionInfo ncf ON ncf.name = 'notifyCustomerFlag' and ncf.id = rq.id AND DATEDIFF(minute,ncf) BETWEEN 0 AND 0 and rq.PartitionId = ncf.PartitionId
LEFT JOIN extensionInfo mdu ON mdu.name = 'monitoringDuration' and mdu.id = rq.id AND DATEDIFF(minute,mdu) BETWEEN 0 AND 0 and rq.PartitionId = mdu.PartitionId
LEFT JOIN extensionInfo cpr ON cpr.name = 'calculatedPriority' and cpr.id = rq.id AND DATEDIFF(minute,cpr) BETWEEN 0 AND 0 and rq.PartitionId = cpr.PartitionId
LEFT JOIN extensionInfo cot ON cot.name = 'contactType' and cot.id = rq.id AND DATEDIFF(minute,cot) BETWEEN 0 AND 0 and rq.PartitionId = cot.PartitionId
),affectedResource as (
SELECT
rq.PartitionId,'affectedResource' as partitionKey,case when len(affectedResource.ArrayValue.id) < 3 then affectedResource.ArrayValue.role else affectedResource.ArrayValue.id end) as rowKey,'serviceProblem' as sourceType,rq.id as sourceTypeId,affectedResource.ArrayValue.id as name,affectedResource.ArrayValue.name as value,affectedResource.ArrayValue.role as role,affectedResource.ArrayValue.[@referredType] as type
from ReaderQuery rq Partition BY PartitionId --INTO 4
CROSS APPLY GetArrayElements(rq.affectedResource) AS affectedResource
),affectedService as (
SELECT
rq.PartitionId,'affectedService' as partitionKey,affectedService.ArrayValue.id) as rowKey,affectedService.ArrayValue.id as name,affectedService.ArrayValue.name as value,'affectedService' as role,'Service' as type
from ReaderQuery rq Partition BY PartitionId --INTO 4
CROSS APPLY GetArrayElements(rq.affectedService) AS affectedService
),relatedObject as (
SELECT
rq.PartitionId,relatedObject.ArrayValue.[@referredType] as name,relatedObject.ArrayValue.name as value
from tmfsnowgstm rq Partition BY PartitionId --INTO 2
CROSS APPLY GetArrayElements(rq.relatedObject) AS relatedObject
),relatedObjectDtls as (
select
'relatedObject' as partitionKey,rq.id as rowKey,prd.value as product,chr.value as changeRequest,ser.value as service,res.value as resource,rccr.value as rootCauseChangeRequest
from readerQuery rq Partition BY PartitionId --INTO 4
LEFT JOIN relatedObject prd ON prd.name = 'product' and prd.id = rq.id AND DATEDIFF(minute,prd) BETWEEN 0 AND 0 and rq.PartitionId = prd.PartitionId
LEFT JOIN relatedObject chr ON chr.name = 'changeRequest' and chr.id = rq.id AND DATEDIFF(minute,chr) BETWEEN 0 AND 0 and rq.PartitionId = chr.PartitionId
LEFT JOIN relatedObject ser ON ser.name = 'service' and ser.id = rq.id AND DATEDIFF(minute,ser) BETWEEN 0 AND 0 and rq.PartitionId = ser.PartitionId
LEFT JOIN relatedObject res ON res.name = 'resource' and res.id = rq.id AND DATEDIFF(minute,res) BETWEEN 0 AND 0 and rq.PartitionId = res.PartitionId
LEFT JOIN relatedObject rccr ON rccr.name = 'rootCauseChangeRequest' and rccr.id = rq.id AND DATEDIFF(minute,rccr) BETWEEN 0 AND 0 and rq.PartitionId = rccr.PartitionId
),serviceProblem as (
Select
rq.PartitionId,'serviceProblem' as partitionKey,rq.name,rq.statusChangeReason,rq.reason,rq.resolutionDate,rq.responsibleParty,rq.description,rq.underlyingProblem,rq.statusChangeDate,rq.problemEscalation,rq.associatedSLAViolation,rq.timeChanged,rq.severity,rq.impactPatterns,rq.timeRaised,rq.underlyingAlarm,rq.rootCauseService,rq.relatedEvent,rq.originatorParty,rq.priority,rq.firstAlert,rq.originatingSystem,--rq.associatedTroubleTicket,rq.affectedNumberOfServices,rq.impactImportanceFactor,rq.category,--rq.parentProblem,rq.affectedLocation,rq.status,rq.rootCauseResource,rq.ssociatedTroubleTicket,case when GetArrayLength(rq.associatedTroubleTicket) < 1 then '' else associatedTroubleTicket.ArrayValue.Id end as associatedTroubleTicket,parentProblem.ArrayValue.Id as parentProblemId,parentProblem.ArrayValue.correlationId as correlationId
--,rpdtls.*
from ReaderQuery rq Partition BY PartitionId --INTO 4
--left join relatedPartyDtls rpdtls on rq.id = rpdtls.id AND DATEDIFF(minute,rpdtls) BETWEEN 0 AND 0 and rq.PartitionId = rpdtls.PartitionId
CROSS APPLY GetArrayElements(rq.associatedTroubleTicket) AS associatedTroubleTicket
CROSS APPLY GetArrayElements(rq.parentProblem) AS parentProblem
)
select
'serviceProblem' as partitionKey,rq.associatedTroubleTicket,resnt.text resolutionNote,svcdesc.text serviceProblemDescription,rq.parentProblemId,rq.correlationId
-- eidtls3.*,-- rpdtls.*
into tsserviceProblem
from serviceProblem rq Partition BY PartitionId
left join comment resnt on rq.id = resnt.serviceProblemId AND DATEDIFF(minute,resnt) BETWEEN 0 AND 0 and rq.PartitionId = resnt.PartitionId and resnt.type = 'resolutionNote'
left join comment svcdesc on rq.id = svcdesc.serviceProblemId AND DATEDIFF(minute,svcdesc) BETWEEN 0 AND 0 and rq.PartitionId = svcdesc.PartitionId and svcdesc.type = 'serviceProblemDescription'
left join relatedObjectDtls rpdtls on rq.id = rpdtls.id AND DATEDIFF(minute,rpdtls) BETWEEN 0 AND 0 and rq.PartitionId = rpdtls.PartitionId
SELECT rq.* into tsextensionInfo from extensionInfoDtls1 rq Partition BY PartitionId
SELECT * into tscomment from comment PARTITION BY PartitionId where type <> 'serviceProblemDescription' and type <> 'resolutionNote'
SELECT * into tsrelatedInfo from affectedResource PARTITION BY PartitionId union SELECT* from affectedService PARTITION BY PartitionId
select * into veritaslake from OrigQ PARTITION BY PartitionId
解决方法
从错误消息开始,事件中心每个分区的每个使用者组仅允许5个接收器连接。使用诸如Union,self-join等运算符的复杂查询,该作业很可能会创建多个输入接收者。您可以按照此故障排除文档maximum number of allowed receivers per partition来重建查询并解决此错误。