Spring Cloud Stream源-不拉

问题描述

我正在尝试开发定制的Source,以用于Spring Cloud Dataflow中的概念验证。

我设法正确地部署了它,但是似乎没有将bean拉出。

这是父级 pom.xml

的一部分
...
<properties>
  <spring-cloud.version>Hoxton.SR8</spring-cloud.version>
  ...
</properties>

<dependencyManagement>
    <dependencies>
      <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-dependencies</artifactId>
        <version>${spring-cloud.version}</version>
        <type>pom</type>
        <scope>import</scope>
      </dependency>
    </dependencies>
</dependencyManagement>
...

这是项目 pom.xml

<dependencies>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-cloud-connectors</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>2.3.10.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-configuration-processor</artifactId>
        <optional>true</optional>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
            <version>2.3.4.RELEASE</version>
            <configuration>
                <excludes>
                    <exclude>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-configuration-processor</artifactId>
                    </exclude>
                </excludes>
            </configuration>
        </plugin>
        <plugin>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-app-starter-Metadata-maven-plugin</artifactId>
            <version>2.0.2.RELEASE</version>
            <executions>
                <execution>
                    <id>aggregate-Metadata</id>
                    <phase>compile</phase>
                    <goals>
                        <goal>aggregate-Metadata</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

SourceApplication.java

@SpringBootApplication
@EnableBinding(Source.class)
@EnableConfigurationProperties(ReportingProperties.class)
public class SourceApplication {

    public static void main(String[] args) {
        SpringApplication.run(SourceApplication.class,args);
    }
}

ReportingProperties.java

@Validated
@ConfigurationProperties("reporting-properties")
public class ReportingProperties {

    /**
     * The starting date of the reporting.
     */
    private LocalDateTime fromDate = LocalDateTime.Now().minusDays(1000);

    /**
     * The end date of the reporting.
     */
    private LocalDateTime toDate = LocalDateTime.Now();

    public LocalDateTime getFromDate() {
        return fromDate;
    }

    public ReportingProperties setFromDate(LocalDateTime fromDate) {
        this.fromDate = fromDate;
        return this;
    }

    public LocalDateTime getToDate() {
        return toDate;
    }

    public ReportingProperties setToDate(LocalDateTime toDate) {
        this.toDate = toDate;
        return this;
    }
}

最后是服务:

@Configuration
@EnableBinding(Source.class)
public class PullUseRSService {

    @Bean
    @Publisher(channel = Source.OUTPUT)
    @SendTo(Source.OUTPUT)
    public supplier<String> pullUsers() {
        return () -> "Test";
    }

}

我想知道如何触发拉动机制,以便在部署时可以在日志中看到“测试” (我相信SCDF上的所有设置都正确设置,如果我执行“ time | log”,我可以在日志中看到一些结果,但是如果执行“ myservice | log”,则什么也不会出现。

我在做什么错? (也许我的代码有些冗余)

解决方法

有趣的是让您使用它的原因:

@Publisher(channel = Source.OUTPUT)
@SendTo(Source.OUTPUT)

如果您看一下您提到的time源代码,就会看到类似这样的内容:

@PollableSource
public String publishTime() {
    return new SimpleDateFormat(this.triggerProperties.getDateFormat()).format(new Date());
}

请考虑改用@PollableSource,而不要使用Supplier。 关键是您当前使用的所有注释都无法进行轮询。

@Publisher仅在调用方法时有效。 @SendTo在这里被完全忽略,因为@Publisher的作用完全相同,并且“发送到”。