用于 Fast-RTPS 的 Streams API

问题描述

我想使用 fast-rtps订阅者发布视频(流数据)。虽然我成功发布了十个连续的jp​​g文件,但订阅者收到的每张图片都浪费了大量时间来处理,因为我使用函数get_byte_value逐个像素。

有谁知道fast-rtps中间件如何更高效地发布和订阅? (创建新类型?其他?)

以下是我的发布者和订阅者的代码

Publisher.cpp

// copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License,Version 2.0 (the "License");

// Hpshboss modifys code from eprosima's github example;
// Licensed under the Apache License,Version 2.0 (the "License");

/**
 * @file PicturePublisher.cpp
 *
 */

#include "Publisher.h"
#include <fastrtps/attributes/ParticipantAttributes.h>
#include <fastrtps/attributes/PublisherAttributes.h>
#include <fastrtps/publisher/Publisher.h>
#include <fastrtps/Domain.h>
#include <fastrtps/types/DynamicTypeBuilderFactory.h>
#include <fastrtps/types/DynamicDataFactory.h>
#include <fastrtps/types/DynamicTypeBuilder.h>
#include <fastrtps/types/DynamicTypeBuilderPtr.h>
#include <fastrtps/types/DynamicType.h>

#include <thread>
#include <time.h>
#include <vector>

#include <opencv2/opencv.hpp>

using namespace eprosima::fastrtps;
using namespace eprosima::fastrtps::rtps;
using namespace eprosima::fastrtps::types;

// using namespace cv;

PicturePublisher::PicturePublisher()
    : mp_participant(nullptr),mp_publisher(nullptr),m_DynType(DynamicType_ptr(nullptr))
{
}

bool PicturePublisher::init()
{
    cv::Mat image = cv::imread("drone.jpg",1);
    std::vector<unsigned char> buffer;
    cv::imencode(".jpg",image,buffer);
    
    // Create basic builders
    DynamicTypeBuilder_ptr struct_type_builder(DynamicTypeBuilderFactory::get_instance()->create_struct_builder());

    DynamicType_ptr octet_type(DynamicTypeBuilderFactory::get_instance()->create_byte_type());
    DynamicTypeBuilder_ptr sequence_type_builder(DynamicTypeBuilderFactory::get_instance()->create_sequence_builder(octet_type,3873715));
    DynamicType_ptr sequence_type = sequence_type_builder->build();
    
    // Add members to the struct. By the way,id must be consecutive starting by zero.
    struct_type_builder->add_member(0,"index",DynamicTypeBuilderFactory::get_instance()->create_uint32_type());
    struct_type_builder->add_member(1,"size",DynamicTypeBuilderFactory::get_instance()->create_uint32_type());
    struct_type_builder->add_member(2,"Picture",sequence_type);
    struct_type_builder->set_name("Picture"); // Need to be same with topic data type
    
    DynamicType_ptr dynType = struct_type_builder->build();
    m_DynType.SetDynamicType(dynType);
    m_DynHello = DynamicDataFactory::get_instance()->create_data(dynType);
    
    m_DynHello->set_uint32_value(0,0);
    m_DynHello->set_uint32_value(buffer.size(),1);
    
    MemberId id;
    // std::cout << "init: " << id << std::endl;
    
    DynamicData* sequence_data = m_DynHello->loan_value(2);
    for (int i = 0; i < buffer.size(); i++) {
        if (i == buffer.size() - 1) {
            std::cout << "Total Size: " << i + 1 << std::endl;
        }
        sequence_data->insert_byte_value(buffer[i],id);
    }
    m_DynHello->return_loaned_value(sequence_data);

    ParticipantAttributes PParam;
    PParam.rtps.setName("DynPicture_pub");
    mp_participant = Domain::createParticipant(PParam,(ParticipantListener*)&m_part_list);

    if (mp_participant == nullptr)
    {
        return false;
    }

    //REGISTER THE TYPE
    Domain::registerDynamicType(mp_participant,&m_DynType);

    //CREATE THE PUBLISHER
    PublisherAttributes Wparam;
    Wparam.topic.topicKind = NO_KEY;
    Wparam.topic.topicDataType = "Picture";
    Wparam.topic.topicName = "Picturetopic";

    mp_publisher = Domain::createPublisher(mp_participant,Wparam,(PublisherListener*)&m_listener);
    if (mp_publisher == nullptr)
    {
        return false;
    }

    return true;

}

PicturePublisher::~PicturePublisher()
{
    Domain::removeParticipant(mp_participant);

    DynamicDataFactory::get_instance()->delete_data(m_DynHello);

    Domain::stopAll();
}

void PicturePublisher::PubListener::onPublicationMatched(
        Publisher* /*pub*/,MatchingInfo& info)
{
    if (info.status == MATCHED_MATCHING)
    {
        n_matched++;
        firstConnected = true;
        std::cout << "Publisher matched" << std::endl;
    }
    else
    {
        n_matched--;
        std::cout << "Publisher unmatched" << std::endl;
    }
}

void PicturePublisher::PartListener::onParticipantdiscovery(
        Participant*,ParticipantdiscoveryInfo&& info)
{
    if (info.status == ParticipantdiscoveryInfo::disCOVERED_PARTICIPANT)
    {
        std::cout << "Participant " << info.info.m_participantName << " discovered" << std::endl;
    }
    else if (info.status == ParticipantdiscoveryInfo::REMOVED_PARTICIPANT)
    {
        std::cout << "Participant " << info.info.m_participantName << " removed" << std::endl;
    }
    else if (info.status == ParticipantdiscoveryInfo::DROPPED_PARTICIPANT)
    {
        std::cout << "Participant " << info.info.m_participantName << " dropped" << std::endl;
    }
}

void PicturePublisher::runThread(
        uint32_t samples,uint32_t sleep)
{
    uint32_t i = 0;

    while (!stop && (i < samples || samples == 0))
    {
        if (publish(samples != 0))
        {
            uint32_t index;
            m_DynHello->get_uint32_value(index,0);
            std::cout << "runThreading...; \tSample Index: " << index << "; \t";

            uint32_t size;
            m_DynHello->get_uint32_value(size,1);
            std::cout << "size: " << size << std::endl;
            

            if (i == 9){
                std::cout << "Structure message" << " with index: " << i + 1 << " SENT" << std::endl;
                // Avoid unmatched condition impact subscriber receiving message
                std::cout << "Wait within twenty second..." << std::endl;
                std::this_thread::sleep_for(std::chrono::milliseconds(10000));
            }
            ++i;
        }
        std::this_thread::sleep_for(std::chrono::milliseconds(sleep));
    }
}

void PicturePublisher::run(
        uint32_t samples,uint32_t sleep)
{
    stop = false;
    std::thread thread(&PicturePublisher::runThread,this,samples,sleep);
    if (samples == 0)
    {
        std::cout << "Publisher running. Please press enter to stop the Publisher at any time." << std::endl;
        std::cin.ignore();
        stop = true;
    }
    else
    {
        std::cout << "Publisher running " << samples << " samples." << std::endl;
    }
    thread.join();
}

bool PicturePublisher::publish(
        bool waitForListener)
{
    // std::cout << "m_listener.n_matched: " << m_listener.n_matched << std::endl;
    if (m_listener.firstConnected || !waitForListener || m_listener.n_matched > 0)
    {
        uint32_t index;
        m_DynHello->get_uint32_value(index,0);
        m_DynHello->set_uint32_value(index + 1,0);

        mp_publisher->write((void*)m_DynHello);
        
        return true;
    }
    return false;
}

PicturePublisher::init() 函数

Subsciber.cpp

// copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License,Version 2.0 (the "License");

/**
 * @file Subscriber.cpp
 *
 */

#include "Subscriber.h"
#include <fastrtps/attributes/ParticipantAttributes.h>
#include <fastrtps/attributes/SubscriberAttributes.h>
#include <fastrtps/subscriber/Subscriber.h>
#include <fastrtps/Domain.h>
#include <fastrtps/types/DynamicTypeBuilderFactory.h>
#include <fastrtps/types/DynamicDataFactory.h>
#include <fastrtps/types/DynamicTypeBuilder.h>
#include <fastrtps/types/DynamicTypeBuilderPtr.h>
#include <fastrtps/types/DynamicType.h>

#include <vector>
#include <string>
#include <sstream>
#include <iterator>

#include <opencv2/opencv.hpp>

using namespace eprosima::fastrtps;
using namespace eprosima::fastrtps::rtps;
using namespace eprosima::fastrtps::types;

// using namespace cv;

PictureSubscriber::PictureSubscriber()
    : mp_participant(nullptr),mp_subscriber(nullptr),m_DynType(DynamicType_ptr(nullptr))
{
}

struct timespec begin,end;
double elapsed;
std::vector<unsigned char> buffer;

bool PictureSubscriber::init()
{

    ParticipantAttributes PParam;
    PParam.rtps.setName("DynPicture_sub");
    mp_participant = Domain::createParticipant(PParam,(ParticipantListener*)&m_part_list);
    if (mp_participant == nullptr)
    {
        return false;
    }

    // Create basic builders
    DynamicTypeBuilder_ptr struct_type_builder(DynamicTypeBuilderFactory::get_instance()->create_struct_builder());

    DynamicTypeBuilder_ptr octet_builder(DynamicTypeBuilderFactory::get_instance()->create_byte_builder());
    DynamicTypeBuilder_ptr sequence_type_builder(DynamicTypeBuilderFactory::get_instance()->create_sequence_builder(octet_builder.get(),3873715));
    DynamicType_ptr sequence_type = sequence_type_builder->build();

    // Add members to the struct.
    struct_type_builder->add_member(0,sequence_type);
    struct_type_builder->set_name("Picture");
    
    DynamicType_ptr dynType = struct_type_builder->build();
    m_DynType.SetDynamicType(dynType);
    m_listener.m_DynHello = DynamicDataFactory::get_instance()->create_data(dynType);

    //REGISTER THE TYPE
    Domain::registerDynamicType(mp_participant,&m_DynType);

    //CREATE THE SUBSCRIBER
    SubscriberAttributes Rparam;
    Rparam.topic.topicKind = NO_KEY;
    Rparam.topic.topicDataType = "Picture";
    Rparam.topic.topicName = "Picturetopic";

    mp_subscriber = Domain::createSubscriber(mp_participant,Rparam,(SubscriberListener*)&m_listener);

    if (mp_subscriber == nullptr)
    {
        return false;
    }


    return true;
}

PictureSubscriber::~PictureSubscriber()
{
    Domain::removeParticipant(mp_participant);

    DynamicDataFactory::get_instance()->delete_data(m_listener.m_DynHello);

    Domain::stopAll();
}

void PictureSubscriber::SubListener::onSubscriptionMatched(
        Subscriber* /*sub*/,MatchingInfo& info)
{
    if (info.status == MATCHED_MATCHING)
    {
        n_matched++;
        std::cout << "Subscriber matched" << std::endl;
    }
    else
    {
        n_matched--;
        std::cout << "Subscriber unmatched" << std::endl;
    }
}

void PictureSubscriber::PartListener::onParticipantdiscovery(
        Participant*,ParticipantdiscoveryInfo&& info)
{
    if (info.status == ParticipantdiscoveryInfo::disCOVERED_PARTICIPANT)
    {
        std::cout << "Participant " << info.info.m_participantName << " discovered" << std::endl;
    }
    else if (info.status == ParticipantdiscoveryInfo::REMOVED_PARTICIPANT)
    {
        std::cout << "Participant " << info.info.m_participantName << " removed" << std::endl;
    }
    else if (info.status == ParticipantdiscoveryInfo::DROPPED_PARTICIPANT)
    {
        std::cout << "Participant " << info.info.m_participantName << " dropped" << std::endl;
    }
}

void PictureSubscriber::SubListener::onNewDataMessage(
        Subscriber* sub)
{
    if (sub->takeNextData((void*)m_DynHello,&m_info))
    {
        if (m_info.sampleKind == ALIVE)
        {
            this->n_samples++;
            
            // Print your structure data here.
            uint32_t index;
            m_DynHello->get_uint32_value(index,0);
            std::cout << "index: " << index <<  "; \t";

            uint32_t size;
            m_DynHello->get_uint32_value(size,1);
            std::cout << "size: " << size <<  std::endl; 
            

            DynamicType_ptr octet_type_temp(DynamicTypeBuilderFactory::get_instance()->create_byte_type());
            DynamicTypeBuilder_ptr sequence_type_builder_temp(DynamicTypeBuilderFactory::get_instance()->create_sequence_builder(octet_type_temp,3873715));
            DynamicType_ptr sequence_type_temp = sequence_type_builder_temp->build();

            DynamicData* sequence_data_temp = m_DynHello->loan_value(2);
            for (int i = 0; i < size; i++) {
                buffer.push_back(sequence_data_temp->get_byte_value(i));
            }
            m_DynHello->return_loaned_value(sequence_data_temp);
            
            cv::Mat imageDecoded = cv::imdecode(buffer,1);

            cv::imwrite(std::to_string(index) + "_droneNew.jpg",imageDecoded);
        }
    }
}

void PictureSubscriber::run()
{
    std::cout << "Subscriber running. Please press enter to stop the Subscriber" << std::endl;
    std::cin.ignore();
}

void PictureSubscriber::run(
        uint32_t number)
{
    std::cout << "Subscriber running until " << number << "samples have been received" << std::endl;
    while (number > this->m_listener.n_samples)
    {
        std::this_thread::sleep_for(std::chrono::milliseconds(500));
    }
}

PictureSubscriber::SubListener::onNewDataMessage(Subscriber* sub) 函数

解决方法

在 eProsima,我们为您指出的问题找到了一些解决方案。

首先,请注意您不需要使用动态类型来定义包含您要传输的图像的类型。在您的情况下,最简单的做法是通过 IDL 文件定义您的类型。使用 IDL 文件和 Fast-DDS-Gen 工具,您可以生成用于访问数据类型元素的代码,以及自动生成数据序列化和反序列化函数。在 Picture.idl 文件中,您将找到以 IDL 格式定义的最适合您使用动态类型创建的数据类型的类型。 Here 您可以找到有关如何使用 Fast-DDS-Gen 工具的指南。在本文档中,您还将找到有关如何使用 IDL 文件生成完整 DDS 发布者/订阅者应用程序的 complete example 以及数据的 supported formats。下面还有文件 Publisher.cppSubscriber.cpp 根据新的数据类型进行了修改。

我们还建议您查看示例 HelloWorldExample,因为它最适合您的需求。在此示例中,您还可以发现最新版本的 Fast DDS (2.1.0) 中包含的新 DDS API。

作为附加说明,我们建议您不要传输八位字节向量,而是在传输之前以字符串 base64 格式对图像进行编码,因为它是最广泛的图像传输格式之一。

图片.idl

struct Picture
{
    unsigned long index;
    unsigned long size;
    sequence<octet> picture;
};

Publisher.cpp

// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License,Version 2.0 (the "License");

// Hpshboss modifys code from eprosima's github example;
// Licensed under the Apache License,Version 2.0 (the "License");

/**
 * @file PicturePublisher.cpp
 *
 */

#include "PicturePublisher.h"
#include <fastrtps/attributes/ParticipantAttributes.h>
#include <fastrtps/attributes/PublisherAttributes.h>
#include <fastrtps/publisher/Publisher.h>
#include <fastrtps/Domain.h>
#include <fastrtps/types/DynamicTypeBuilderFactory.h>
#include <fastrtps/types/DynamicDataFactory.h>
#include <fastrtps/types/DynamicTypeBuilder.h>
#include <fastrtps/types/DynamicTypeBuilderPtr.h>
#include <fastrtps/types/DynamicType.h>

#include <thread>
#include <time.h>
#include <vector>

#include <opencv2/opencv.hpp>

using namespace eprosima::fastrtps;
using namespace eprosima::fastrtps::rtps;
using namespace eprosima::fastrtps::types;

// using namespace cv;

PicturePublisher::PicturePublisher()
    : mp_participant(nullptr),mp_publisher(nullptr)
{
}

bool PicturePublisher::init()
{
    cv::Mat image = cv::imread("dog.jpg",cv::IMREAD_COLOR);

    if(image.empty())
    {
        std::cout << "Could not read the image." << std::endl;
        return false;
    }
    cv::imshow("Display window",image);
    int k = cv::waitKey(0);

    std::vector<unsigned char> buffer;

    if(!cv::imencode(".jpg",image,buffer)){
        printf("Image encoding failed");
    }

    m_Picture.index(0);
    m_Picture.size(buffer.size());
    m_Picture.picture(buffer);

    ParticipantAttributes PParam;
    PParam.rtps.setName("Picture_pub");
    mp_participant = Domain::createParticipant(PParam,&m_part_list);

    if (mp_participant == nullptr)
    {
        return false;
    }

    //REGISTER THE TYPE
    Domain::registerType(mp_participant,&m_type);
    // Domain::registerDynamicType(mp_participant,&m_DynType);

    //CREATE THE PUBLISHER
    PublisherAttributes Wparam;
    Wparam.topic.topicKind = NO_KEY;
    Wparam.topic.topicDataType = "Picture";
    Wparam.topic.topicName = "PictureTopic";

    mp_publisher = Domain::createPublisher(mp_participant,Wparam,(PublisherListener*)&m_listener);
    if (mp_publisher == nullptr)
    {
        return false;
    }

    return true;
}

PicturePublisher::~PicturePublisher()
{
    Domain::removeParticipant(mp_participant);
}

void PicturePublisher::PubListener::onPublicationMatched(
        Publisher* /*pub*/,MatchingInfo& info)
{
    if (info.status == MATCHED_MATCHING)
    {
        n_matched++;
        firstConnected = true;
        std::cout << "Publisher matched" << std::endl;
    }
    else
    {
        n_matched--;
        std::cout << "Publisher unmatched" << std::endl;
    }
}

void PicturePublisher::PartListener::onParticipantDiscovery(
        Participant*,ParticipantDiscoveryInfo&& info)
{
    if (info.status == ParticipantDiscoveryInfo::DISCOVERED_PARTICIPANT)
    {
        std::cout << "Participant " << info.info.m_participantName << " discovered" << std::endl;
    }
    else if (info.status == ParticipantDiscoveryInfo::REMOVED_PARTICIPANT)
    {
        std::cout << "Participant " << info.info.m_participantName << " removed" << std::endl;
    }
    else if (info.status == ParticipantDiscoveryInfo::DROPPED_PARTICIPANT)
    {
        std::cout << "Participant " << info.info.m_participantName << " dropped" << std::endl;
    }
}

void PicturePublisher::runThread(
        uint32_t samples,uint32_t sleep)
{
    uint32_t i = 0;

    while (!stop && (i < samples || samples == 0))
    {
        if (publish(samples != 0))
        {
            std::cout << "runThreading...; \tSample Index: " << m_Picture.index() << "; \t";

            std::cout << "size: " << m_Picture.size() << std::endl;


            if (i == 9){
                std::cout << "Structure message" << " with index: " << i + 1 << " SENT" << std::endl;
                // Avoid unmatched condition impact subscriber receiving message
                std::cout << "Wait within twenty second..." << std::endl;
                std::this_thread::sleep_for(std::chrono::milliseconds(10000));
            }
            ++i;
        }
        std::this_thread::sleep_for(std::chrono::milliseconds(sleep));
    }
}

void PicturePublisher::run(
        uint32_t samples,uint32_t sleep)
{
    stop = false;
    std::thread thread(&PicturePublisher::runThread,this,samples,sleep);
    if (samples == 0)
    {
        std::cout << "Publisher running. Please press enter to stop the Publisher at any time." << std::endl;
        std::cin.ignore();
        stop = true;
    }
    else
    {
        std::cout << "Publisher running " << samples << " samples." << std::endl;
    }
    thread.join();
}

bool PicturePublisher::publish(
        bool waitForListener)
{
    // std::cout << "m_listener.n_matched: " << m_listener.n_matched << std::endl;
    if (m_listener.firstConnected || !waitForListener || m_listener.n_matched > 0)
    {
        m_Picture.index(m_Picture.index() + 1);

        mp_publisher->write((void*)&m_Picture);

        return true;
    }
    return false;
}

订阅者.cpp

// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License,Version 2.0 (the "License");

/**
 * @file Subscriber.cpp
 *
 */

#include "PictureSubscriber.h"
#include <fastrtps/attributes/ParticipantAttributes.h>
#include <fastrtps/attributes/SubscriberAttributes.h>
#include <fastrtps/subscriber/Subscriber.h>
#include <fastrtps/Domain.h>
#include <fastrtps/types/DynamicTypeBuilderFactory.h>
#include <fastrtps/types/DynamicDataFactory.h>
#include <fastrtps/types/DynamicTypeBuilder.h>
#include <fastrtps/types/DynamicTypeBuilderPtr.h>
#include <fastrtps/types/DynamicType.h>

#include <vector>
#include <string>
#include <sstream>
#include <iterator>

#include <opencv2/opencv.hpp>

using namespace eprosima::fastrtps;
using namespace eprosima::fastrtps::rtps;
using namespace eprosima::fastrtps::types;

// using namespace cv;

PictureSubscriber::PictureSubscriber()
    : mp_participant(nullptr),mp_subscriber(nullptr)
{
}

struct timespec begin,end;
double elapsed;
std::vector<unsigned char> buffer;

bool PictureSubscriber::init()
{

    ParticipantAttributes PParam;
    PParam.rtps.setName("Picture_sub");
    mp_participant = Domain::createParticipant(PParam,&m_part_list);
    if (mp_participant == nullptr)
    {
        return false;
    }

    //REGISTER THE TYPE
    Domain::registerType(mp_participant,&m_type);

    //CREATE THE SUBSCRIBER
    SubscriberAttributes Rparam;
    Rparam.topic.topicKind = NO_KEY;
    Rparam.topic.topicDataType = "Picture";
    Rparam.topic.topicName = "PictureTopic";

    mp_subscriber = Domain::createSubscriber(mp_participant,Rparam,(SubscriberListener*)&m_listener);

    if (mp_subscriber == nullptr)
    {
        return false;
    }


    return true;
}

PictureSubscriber::~PictureSubscriber()
{
    Domain::removeParticipant(mp_participant);
}

void PictureSubscriber::SubListener::onSubscriptionMatched(
        Subscriber* /*sub*/,MatchingInfo& info)
{
    if (info.status == MATCHED_MATCHING)
    {
        n_matched++;
        std::cout << "Subscriber matched" << std::endl;
    }
    else
    {
        n_matched--;
        std::cout << "Subscriber unmatched" << std::endl;
    }
}

void PictureSubscriber::PartListener::onParticipantDiscovery(
        Participant*,ParticipantDiscoveryInfo&& info)
{
    if (info.status == ParticipantDiscoveryInfo::DISCOVERED_PARTICIPANT)
    {
        std::cout << "Participant " << info.info.m_participantName << " discovered" << std::endl;
    }
    else if (info.status == ParticipantDiscoveryInfo::REMOVED_PARTICIPANT)
    {
        std::cout << "Participant " << info.info.m_participantName << " removed" << std::endl;
    }
    else if (info.status == ParticipantDiscoveryInfo::DROPPED_PARTICIPANT)
    {
        std::cout << "Participant " << info.info.m_participantName << " dropped" << std::endl;
    }
}

void PictureSubscriber::SubListener::onNewDataMessage(
        Subscriber* sub)
{
    std::cout << "Data received." << std::endl;

    if (sub->takeNextData((void*)&m_Picture,&m_info))
    {
        if (m_info.sampleKind == ALIVE)
        {
            this->n_samples++;

            // Print your structure data here.
            uint32_t index = m_Picture.index();
            std::cout << "index: " << index <<  "; \t";

            std::cout << "size: " << m_Picture.size() <<  std::endl;

            cv::Mat imageDecoded = cv::imdecode(m_Picture.picture(),1);
            cv::imwrite(std::to_string(index) + "_dog_received.jpg",imageDecoded);
        }
    }
}

void PictureSubscriber::run()
{
    std::cout << "Subscriber running. Please press enter to stop the Subscriber" << std::endl;
    std::cin.ignore();
}

void PictureSubscriber::run(
        uint32_t number)
{
    std::cout << "Subscriber running until " << number << "samples have been received" << std::endl;
    while (number > this->m_listener.n_samples)
    {
        std::this_thread::sleep_for(std::chrono::milliseconds(500));
    }
}