Pyhton zeep soap请求签名失败:无法加载证书

问题描述

我正在尝试使用Python zeep发出SOAP请求,该请求需要使用RSA 2048的sha256签名。 由于某种原因,我无法加载签名,并且在尝试发送请求时出现“签名失败:无法加载证书”错误

下面的示例代码

from zeep import Client as cl
from requests import Session
from zeep.transports import Transport
from zeep.plugins import HistoryPlugin
from lxml import etree
import zeep
from zeep.wsse.signature import Signature
import uuid
import OpenSSL
import base64
from zeep.wsse import utils
from datetime import datetime,timedelta


# Helper Class
class SingnatureOverride(Signature):
    def apply(self,envelope,headers):
        security = utils.get_security_header(envelope)

        created = datetime.utcNow()
        expired = created + timedelta(seconds=1 * 60)

        timestamp = utils.WSU('Timestamp')
        timestamp.append(utils.WSU('Created',created.replace(microsecond=0).isoformat()+'Z'))
        timestamp.append(utils.WSU('Expires',expired.replace(microsecond=0).isoformat()+'Z'))

        security.append(timestamp)

        super().apply(envelope,headers)
        return envelope,headers

# Override response verification and skip response verification for Now...
# Zeep does not supprt Signature verification with different certificate...
# Ref. https://github.com/mvantellingen/python-zeep/pull/822/  "Add support for different signing and verification certificates #822"
    def verify(self,envelope):
        return envelope


# Funtion to generete ecrtificates and keys
def create_csr(common_name,country=None,state=None,city=None,organization=None,organizational_unit=None,email_address=None):

    key = OpenSSL.crypto.PKey()
    key.generate_key(OpenSSL.crypto.TYPE_RSA,2048)

    req = OpenSSL.crypto.X509()
    req.get_subject().CN = common_name
    if organizational_unit:
        req.get_subject().OU = organizational_unit
    if organization:
        req.get_subject().O = organization
    if city:
        req.get_subject().L = city
    if state:
        req.get_subject().ST = state
    if country:
        req.get_subject().C = country
    if email_address:
        req.get_subject().emailAddress = email_address

    req.set_pubkey(key)
    req.sign(key,'sha256')

    private_key = OpenSSL.crypto.dump_privatekey(
        OpenSSL.crypto.FILETYPE_PEM,key)
    public_key = OpenSSL.crypto.dump_publickey(OpenSSL.crypto.FILETYPE_PEM,key)

    csr = OpenSSL.crypto.dump_certificate(
               OpenSSL.crypto.FILETYPE_PEM,req)
    
    out_keys = {
        "private key" : private_key,"public key": public_key,"certificate": csr,"reqest": req
    }
    return out_keys


certificate = create_csr(common_name="somename",organizational_unit="unit",organization="unit",city="test",state="test",country="GB"
                         )

# Write certificate and keys to files
with open('cert.pem',"wb") as f:
    f.write(certificate['certificate'])

with open('public.pem',"wb") as f:
    f.write(certificate['public key'])

with open('private.pem',"wb") as f:
    f.write(certificate['private key'])



# Create Client
history = HistoryPlugin()
session = Session()
session.verify = False  # For some reason SSL validation fails and I had to ignore it. This is unacceptable and needs to be addressed,but it is outside of the scope of this question.
transport = Transport(session=session)
client = cl(url,transport=transport,plugins=[history],wsse=SingnatureOverride("private.pem","cert.pem"))

# url
url="https://stest.bankconnect.dk/2019/04/04/services/CorporateService?wsdl"

# build headers
Now = datetime.Now()
unique_id = uuid.uuid4().hex
headers = {
        'serviceHeader': {
        'organisationIdentification':  {
            'mainRegistrationNumber': "randint",'isoCountryCode': 'GB',},'format': 'ISO20022','functionIdentification': "randint",'erpinformation': 'randstring','endToEndMessageId': unique_id,'createDateTime': Now.strftime("%Y-%m-%dT%H:%M:%s"),}

# Send request
# InternalError: (-1,'cannot load cert') here
client.service.getCustomerStatement(_soapheaders=headers,)

# debug
sent = etree.tostring(history.last_sent["envelope"],encoding="unicode",pretty_print=True)
received = etree.tostring(history.last_received["envelope"],pretty_print=True)

print(sent)
print(received)

我不得不匿名化一些机密信息,但是如果对于解决该问题至关重要,请随时询问详细信息。

编辑:添加了测试网址

Edit2:添加了正确的标头,以便于复制

解决方法

我想我明白了。在签署证书之前,将以下几行添加到'create_csr'函数中似乎可以解决此问题。它解决了特定的“加载证书”问题并发送了请求,但是服务器仍然认为签名无效,因此我不确定帮助程序功能是否生成了无效证书,或者这是服务器问题。

def create_csr(*args):
....
    req.set_serial_number(1000)
    req.gmtime_adj_notBefore(0)
    req.gmtime_adj_notAfter(10*365*24*60*60)
    req.set_issuer(req.get_subject())

    req.sign(key,'sha256')
....