Docker容器中的Spark无法读取Kafka输入-结构化流

问题描述

当通过#invoke command from lambda client.start_notebook_instance(NotebookInstanceName='<sagemaker_instance_name>') 在没有Docker的情况下在本地运行Spark作业时,一切正常。 但是,在docker容器上运行将导致不生成任何输出

要查看Kafka本身是否正常运行,我将Kafka提取到了Spark worker容器上,并让控制台用户收听了正确运行的同一主机,端口和主题(kafka:9092,crypto_topic),并显示输出。 (有一个生产者不断将数据推送到另一个容器中的主题

预期-

#!/bin/bash

set -e

ENVIRONMENT=JupyterSystemEnv
#JupyterSystemEnv
NOTEBOOK_FILE=/home/ec2-user/SageMaker/XGBoost_training.ipynb

source /home/ec2-user/anaconda3/bin/activate "$ENVIRONMENT"

nohup jupyter nbconvert --ExecutePreprocessor.timeout=-1 --ExecutePreprocessor.kernel_name=python3 --to notebook --execute "$NOTEBOOK_FILE" &

实际

spark-submit

docker-compose.yml文件

20/09/11 17:35:27 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.29.10:42565 with 366.3 MB RAM,BlockManagerId(driver,192.168.29.10,42565,None)
20/09/11 17:35:27 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver,None)
20/09/11 17:35:27 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver,None)
-------------------------------------------
Batch: 0
-------------------------------------------
+---------+-----------+-----------------+------+----------+------------+-----+-------------------+---------+
|name_coin|symbol_coin|number_of_markets|volume|market_cap|total_supply|price|percent_change_24hr|timestamp|
+---------+-----------+-----------------+------+----------+------------+-----+-------------------+---------+
+---------+-----------+-----------------+------+----------+------------+-----+-------------------+---------+
...
...
...
followed by more output

20/09/11 14:49:44 INFO BlockManagerMasterEndpoint: Registering block manager d7443d94165c:46203 with 366.3 MB RAM,d7443d94165c,46203,None) 20/09/11 14:49:44 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver,None) 20/09/11 14:49:44 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver,None) 20/09/11 14:49:44 INFO StandaloneschedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0 no more output,stuck here

运行
version: "3"

services:

    zookeeper:
        image: zookeeper:3.6.1
        container_name: zookeeper
        hostname: zookeeper
        ports:
            - "2181:2181"
        networks:
            - crypto-network
      
    kafka:
        image: wurstmeister/kafka:2.13-2.6.0
        container_name: kafka
        hostname: kafka
        ports:
            - "9092:9092"
        environment:
            - KAFKA_ADVERTISED_HOST_NAME=kafka
            - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 
            - KAFKA_ADVERTISED_PORT=9092
            # topic-name:partitions:in-sync-replicas:cleanup-policy
            - KAFKA_CREATE_TOPICS="crypto_topic:1:1:compact"
        networks:
            - crypto-network

    kafka-producer:
        image: python:3-alpine
        container_name: kafka-producer
        command: >
                sh -c "pip install -r /usr/src/producer/requirements.txt
                && python3 /usr/src/producer/kafkaProducerService.py"
        volumes:
            - ./kafkaProducer:/usr/src/producer
        networks: 
            - crypto-network
      
            
    cassandra:
        image: cassandra:3.11.8
        container_name: cassandra
        hostname: cassandra
        ports:
            - "9042:9042"
        #command:
        #    cqlsh -f /var/lib/cassandra/cql-queries.cql
        volumes:
            - ./cassandraData:/var/lib/cassandra

        networks:
            - crypto-network
            
    spark-master:
        image: bde2020/spark-master:2.4.5-hadoop2.7
        container_name: spark-master
        hostname: spark-master
        ports:
            - "8080:8080"
            - "7077:7077"
            - "6066:6066"
        networks:
            - crypto-network
            
    spark-consumer-worker:
        image: bde2020/spark-worker:2.4.5-hadoop2.7
        container_name: spark-consumer-worker
        environment:
            - SPARK_MASTER=spark://spark-master:7077
        ports:
            - "8081:8081"
        volumes:
            - ./sparkConsumer:/sparkConsumer
        networks:
            - crypto-network
    
            
networks:
  crypto-network:
    driver: bridge

Spark代码的相关部分

spark-submit

解决方法

  val inputDF: DataFrame = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers","kafka:9092")
    .option("subscribe","crypto_topic")
    .load()

这部分代码实际上是

  val inputDF: DataFrame = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers",KAFKA_BOOTSTRAP_SERVERS)
    .option("subscribe",KAFKA_TOPIC)
    .load()

在本地打包jar时,从配置文件中读取KAFKA_BOOTSTRAP_SERVERSKAFKA_TOPIC的位置。

为我调试的最佳方法是将日志设置为更详细。

在本地,KAFKA_BOOTSTRAP_SERVERS的值为localhost:9092,但是在Docker容器中,该配置文件中的值已更改为kafka:9092。但是,由于JAR已打包,因此没有反映出来。因此,在本地打包时将其值更改为kafka:9092即可。

我将非常感谢有关如何使JAR动态获取配置的任何帮助。我不想通过SBT在Docker容器上打包。