问题描述
我正在使用 azure databricks 开发流式应用程序。我正在连接到融合的 kafka 并使用模式(来自模式注册表)读取流和反序列化 avro 数据并将数据保存到增量表。 但是我有一个要求,其中主题可以有多个架构,并且流在每一行上都包含架构 ID。我需要检查 schemaid 并应用该行的特定架构并将其保存到增量表。 我尝试了多种方法,例如 - 我已将 schema_id 传递给函数并获取架构并将其应用于如下数据 -
# X = see above
pca = PCA(n_components=15).fit(X.reshape((X.shape[0],-1)))
pcs = pca.components_.reshape((-1,X.shape[1],X.shape[2]))
img,loadings = X[1],pca.transform(X[1].reshape(-1,1)).T
ORIG,DIFF,RECN,= [],[],[]
reconstructed,distortion,frames = np.zeros_like(X[0]),[]
for i in range(len(pca.components_)):
# Reconstruct image using the first i principal components
reconstructed += loadings[i].reshape(img.shape) * pca.components_[i].reshape(img.shape)
distortion.append(np.sum((img - reconstructed) ** 2))
# Append animation frame every 5'th reconstruction
if i % 2 == 0 or i == pca.n_components_-1:
ORIG = np.append(ORIG,img)
DIFF = np.append(DIFF,(img - reconstructed).copy())
RECN = np.append(RECN,reconstructed.copy())
DATA = np.array([np.reshape(ORIG,(8,16,16)),np.reshape(DIFF,np.reshape(RECN,16))])
fig0 = px.imshow(DATA,animation_frame=1,facet_col=0,binary_string=True)
fig0.show()
print(fig0) #inspect the layout
我收到错误,如列不可迭代
我无法通过流使用 foreach 循环来执行此操作。我使用了 foreachwriter 类,在此过程中我正在调用 from_avro()
kafka_output_df = kafka_input_df.select(
col("key"),col("value"),col("topic"),col("partition"),col("offset"),col("timestamp"),col("timestampType"),col("valueSchemaId"),col("fixedValue"),getSchema_fromId_udf(col("valueSchemaId")).alias("schema")
)
kafka_output_df=kafka_output_df.select(from_avro(col("fixedValue"),col("schema")).alias("data"))
但它给了我错误 -'AttributeError: 'nonetype' object has no attribute '_jvm'',
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)