问题描述
当通过#invoke command from lambda
client.start_notebook_instance(NotebookInstanceName='<sagemaker_instance_name>')
在没有Docker的情况下在本地运行Spark作业时,一切正常。
但是,在docker容器上运行将导致不生成任何输出。
要查看Kafka本身是否正常运行,我将Kafka提取到了Spark worker容器上,并让控制台用户收听了正确运行的同一主机,端口和主题(kafka:9092,crypto_topic),并显示了输出。 (有一个生产者不断将数据推送到另一个容器中的主题)
预期-
#!/bin/bash
set -e
ENVIRONMENT=JupyterSystemEnv
#JupyterSystemEnv
NOTEBOOK_FILE=/home/ec2-user/SageMaker/XGBoost_training.ipynb
source /home/ec2-user/anaconda3/bin/activate "$ENVIRONMENT"
nohup jupyter nbconvert --ExecutePreprocessor.timeout=-1 --ExecutePreprocessor.kernel_name=python3 --to notebook --execute "$NOTEBOOK_FILE" &
实际
spark-submit
docker-compose.yml文件
20/09/11 17:35:27 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.29.10:42565 with 366.3 MB RAM,BlockManagerId(driver,192.168.29.10,42565,None)
20/09/11 17:35:27 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver,None)
20/09/11 17:35:27 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver,None)
-------------------------------------------
Batch: 0
-------------------------------------------
+---------+-----------+-----------------+------+----------+------------+-----+-------------------+---------+
|name_coin|symbol_coin|number_of_markets|volume|market_cap|total_supply|price|percent_change_24hr|timestamp|
+---------+-----------+-----------------+------+----------+------------+-----+-------------------+---------+
+---------+-----------+-----------------+------+----------+------------+-----+-------------------+---------+
...
...
...
followed by more output
20/09/11 14:49:44 INFO BlockManagerMasterEndpoint: Registering block manager d7443d94165c:46203 with 366.3 MB RAM,d7443d94165c,46203,None)
20/09/11 14:49:44 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver,None)
20/09/11 14:49:44 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver,None)
20/09/11 14:49:44 INFO StandaloneschedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0
no more output,stuck here
由
version: "3"
services:
zookeeper:
image: zookeeper:3.6.1
container_name: zookeeper
hostname: zookeeper
ports:
- "2181:2181"
networks:
- crypto-network
kafka:
image: wurstmeister/kafka:2.13-2.6.0
container_name: kafka
hostname: kafka
ports:
- "9092:9092"
environment:
- KAFKA_ADVERTISED_HOST_NAME=kafka
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_PORT=9092
# topic-name:partitions:in-sync-replicas:cleanup-policy
- KAFKA_CREATE_TOPICS="crypto_topic:1:1:compact"
networks:
- crypto-network
kafka-producer:
image: python:3-alpine
container_name: kafka-producer
command: >
sh -c "pip install -r /usr/src/producer/requirements.txt
&& python3 /usr/src/producer/kafkaProducerService.py"
volumes:
- ./kafkaProducer:/usr/src/producer
networks:
- crypto-network
cassandra:
image: cassandra:3.11.8
container_name: cassandra
hostname: cassandra
ports:
- "9042:9042"
#command:
# cqlsh -f /var/lib/cassandra/cql-queries.cql
volumes:
- ./cassandraData:/var/lib/cassandra
networks:
- crypto-network
spark-master:
image: bde2020/spark-master:2.4.5-hadoop2.7
container_name: spark-master
hostname: spark-master
ports:
- "8080:8080"
- "7077:7077"
- "6066:6066"
networks:
- crypto-network
spark-consumer-worker:
image: bde2020/spark-worker:2.4.5-hadoop2.7
container_name: spark-consumer-worker
environment:
- SPARK_MASTER=spark://spark-master:7077
ports:
- "8081:8081"
volumes:
- ./sparkConsumer:/sparkConsumer
networks:
- crypto-network
networks:
crypto-network:
driver: bridge
Spark代码的相关部分
spark-submit
解决方法
val inputDF: DataFrame = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers","kafka:9092")
.option("subscribe","crypto_topic")
.load()
这部分代码实际上是
val inputDF: DataFrame = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers",KAFKA_BOOTSTRAP_SERVERS)
.option("subscribe",KAFKA_TOPIC)
.load()
在本地打包jar时,从配置文件中读取KAFKA_BOOTSTRAP_SERVERS
和KAFKA_TOPIC
的位置。
为我调试的最佳方法是将日志设置为更详细。
在本地,KAFKA_BOOTSTRAP_SERVERS
的值为localhost:9092
,但是在Docker容器中,该配置文件中的值已更改为kafka:9092
。但是,由于JAR已打包,因此没有反映出来。因此,在本地打包时将其值更改为kafka:9092
即可。
我将非常感谢有关如何使JAR动态获取配置的任何帮助。我不想通过SBT在Docker容器上打包。