PHP异步多线程卷曲应用程序

问题描述

我正在寻找一个每秒可以处理至少300-400个事务的进程/脚本。 目前,我正在使用Workerman进行以下工作。我可以运行400个线程而没有任何问题,但是tps大约是60-70 tps,可以处理不到一秒的延迟。

下面是工作代码:-

main.PHP:-

<?PHP
require_once __DIR__ . '/vendor/autoload.PHP';
use Workerman\Worker;

$http_worker = new Worker('http://0.0.0.0:2345');

$http_worker->count = 400;
$http_worker->onMessage = function ($connection,$request) {
//Config
    $connection->send("");
    $url = 'http://localhost:3000';
    $packageid=11;
    $payload = $request->post();
    $temp_payload = implode("|",$payload);
    list($id,$user,$package_id,$timestamp) = explode('|',$temp_payload);

    $date_request_first=date('Y-m-d H:i:s');// PHP Worker current casting timestamp
    $date_compare1= date("Y-m-d h:i:s a",strtotime($date_request_first));
    
    $xml_get_subscriber='<?xml version="1.0" encoding="UTF-8"?><SOAP-ENV:Envelope xmlns:SOAP-ENV="http://schemas.xmlsoap.org/soap/envelope/" xmlns:SOAP-ENC="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" ><SOAP-ENV:Body>
    <ns2:root>
        <msg_head>
            <time>2020-08-20 17:57:29</time>
            <from />
            <to />
            <msg_type />
            <serial />
        </msg_head>
        <interface_msg>
            <msg_response>
                <ResponseClass Name="Response">
                    <GetUserClass Name="AJAX">
                        <ResultCode>0</ResultCode>
                        <ResultDescr>success</ResultDescr>
                        <IBAN>'.$id.'</IBAN>
                        <PREFERNOTIFYMETHOD>1</PREFERNOTIFYMETHOD>
                    </GetUserClass>
                </ResponseClass>
            </msg_response>
        </interface_msg>
    </ns2:root>
</SOAP-ENV:Body>
</SOAP-ENV:Envelope>';

$doc = new DOMDocument();
$doc->loadXML(getURLContent($url,$id,$msisdn,$provisioning_recipe));
//$doc->loadXML($result_provisioning);

$xpath = new DOMXPath($doc);

foreach ($xpath->query("//ResultCode/text()")  as $package) {
    $resultCode = $package->textContent;
}
if($resultCode == 0 && $resultDesc ="success"){


$doc = new DOMDocument();
$doc->loadXML(getURLContent($url,$xml_get_subscriber_info));

$xpath = new DOMXPath($doc);

foreach ($xpath->query("//PACKAGEID/text()")  as $match1) {
    $PAK_checking = $match1->textContent;
}

$myArrayPak = explode('$',$PAK_checking);
$key_value = array_search($packageid,$myArrayPak);
        if($key_value)
        {
            
        $conf = new RdKafka\Conf();
        $conf->set('Metadata.broker.list','192.168.0.16:9092');
        
        $producer = new RdKafka\Producer($conf);
        $topic = $producer->newTopic("successful-request");
        $produce_date =date('Y-m-d H:i:s');
        
        $ar=date_create($myArrayEndDate[$key_value]);
        $final_date = date_format($ar,"Y-m-d H:i:s");
        $toStore="$id;$msisdn;$package_id;1;$final_date";
        echo "PackageID = $myArrayPak[$key_value],End-Date = $final_date\n";
        $topic->produce(RD_KAFKA_PARTITION_UA,"$toStore");
        $producer->poll(0);
        
        for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
            $result = $producer->flush(10000);
            if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
                break;
            }
        }
        if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
            throw new \RuntimeException('Was unable to flush,messages might be lost!');
        }   


}
}


function getUrlContent($url,$uid,$pckg,$xml_get_subscriber_info){
$ch = curl_init();
curl_setopt($ch,CURLOPT_URL,$url);
curl_setopt($ch,CURLOPT_RETURNTRANSFER,1);
curl_setopt($ch,CURLOPT_CONNECTTIMEOUT,5);
curl_setopt($ch,CURLOPT_TIMEOUT,CURLOPT_POSTFIELDS,$xml_get_subscriber_info);
$data = curl_exec($ch);
$httpcode = curl_getinfo($ch,CURLINFO_HTTP_CODE);
curl_close($ch);
//return ($httpcode>=200 && $httpcode<300) ? $data : false;
if($httpcode!= 200)
{
    $conf = new RdKafka\Conf();
        $conf->set('Metadata.broker.list','192.168.0.17:9092');
        
        $producer = new RdKafka\Producer($conf);
        $topic = $producer->newTopic("Failed-request");
        $produce_date =date('Y-m-d H:i:s');
        
        $toStore="$uid;$msisdn;$pckg;$produce_date";
        $topic->produce(RD_KAFKA_PARTITION_UA,messages might be lost!');
        }   
}
else
{
   return $data;
}

}



}


// run all workers
Worker::runAll();
?>

现在,当我将线程增加到800时,问题开始了:-

Notice: Undefined variable: PAK in test.PHP.PHP on line 77
PHP Warning:  DOMDocument::loadXML(): Empty string supplied as input in test.PHP.PHP on line 69

Warning: DOMDocument::loadXML(): Empty string supplied as input in test.PHP.PHP on line 69
PHP Notice:  Undefined variable: PAK in test.PHP.PHP on line 77

Notice: Undefined variable: PAK in test.PHP.PHP on line 77
PHP Warning:  DOMDocument::loadXML(): Empty string supplied as input in test.PHP.PHP on line 166

Warning: DOMDocument::loadXML(): Empty string supplied as input in test.PHP.PHP on line 166
PHP Notice:  Undefined variable: resultCode in test.PHP.PHP on line 182

Notice: Undefined variable: resultCode in test.PHP.PHP on line 182
PHP Warning:  DOMDocument::loadXML(): Empty string supplied as input in test.PHP.PHP on line 185

Warning: DOMDocument::loadXML(): Empty string supplied as input in test.PHP.PHP on line 185
PHP Notice:  Undefined variable: PAK_checking in test.PHP.PHP on line 197

Notice: Undefined variable: PAK_checking in test.PHP.PHP on line 197
PHP Notice:  Undefined variable: psetdatelist in test.PHP.PHP on line 198

Notice: Undefined variable: psetdatelist in test.PHP.PHP on line 198
PHP Warning:  DOMDocument::loadXML(): Empty string supplied as input in test.PHP.PHP on line 166

Warning: DOMDocument::loadXML(): Empty string supplied as input in test.PHP.PHP on line 166
PHP Notice:  Undefined variable: resultCode in test.PHP.PHP on line 182

Notice: Undefined variable: resultCode in test.PHP.PHP on line 182
PHP Warning:  DOMDocument::loadXML(): Empty string supplied as input in test.PHP.PHP on line 166

Warning: DOMDocument::loadXML(): Empty string supplied as input in test.PHP.PHP on line 166
PHP Notice:  Undefined variable: resultCode in test.PHP.PHP on line 182

Notice: Undefined variable: resultCode in test.PHP.PHP on line 182
PHP Warning:  DOMDocument::loadXML(): Empty string supplied as input in test.PHP.PHP on line 166

我检查了一下,当我增加工作人员人数时,CURL XML似乎为空。 但是当它是400个http_worker时,根本没有问题。

我在VM上的8核心cpu和16GB RAM上运行。 我的目标是要处理来自北向的尽可能多的请求,并检查具有最低300 TPS的南向。

处理流程如下:-

客户端-> Main.PHP->向南检查->生成Kafka主题

问题:当我从400个线程增加到800个线程时,我遇到的问题是我的卷曲响应始终为空。

我是否可以利用任何方法发送最低300 tps的数据并进行XML解析而没有任何问题?还是对我有任何改善流程的建议?

解决方法

我发现OS Unix在处理200-300 tps以上的多个请求时有局限性。

我找到了另一个使用guzzleHTTP的替代方法,它对我有帮助。 它帮助我达到了600 tps +的速度,并且交易延迟为2秒。