问题描述
我需要动态读取Parquet文件并提取唯一记录。每个文件可以包含1个或多个键列。
-
假设文件将具有1个关键列,我使用
def get(self,username): """Get user by username""" user = get_user_by_username(username,env,True) if len(user) > 0: return namespace.marshal(user,marshal_model),200 return {},404
参数设计了以下数据流。
我尝试将名称中的值作为硬编码值(地址ID)给出, 有效。
有人可以帮助我如何动态地使用AddressId(此参数为键列名称的参数值)重命名此ID吗?
此外,当键列为1时,上述情况也是可能的。 当键列超过1个并动态处理时,是否可以使用Azure数据工厂来处理方案?
根据此,我们将使用adf或ADB。
数据流代码:
ID
DataFlow脚本
{
"name": "RemoveDuplicateRows","properties": {
"type": "MappingDataFlow","typeProperties": {
"sources": [
{
"dataset": {
"referenceName": "DS_Parquet_DF","type": "DatasetReference"
},"name": "source1"
}
],"sinks": [
{
"dataset": {
"referenceName": "DS_Parquet_Cleaned","name": "sink1"
}
],"transformations": [
{
"name": "Aggregate1"
},{
"name": "Select1"
}
],"script": "parameters{\n\tID as string ('AddressID')\n}\nsource(allowSchemaDrift: true,\n\tvalidateSchema: false,\n\tformat: 'parquet',\n\tpartitionBy('roundRobin',2)) ~> source1\nsource1 aggregate(groupBy(ID = byName($ID)),\n\teach(match(name!=$ID),$$ = first($$))) ~> Aggregate1\nAggregate1 select(mapColumn(\n\t\teach(match(name=='ID'),\n\t\t\t'AddressID' = $$),\n\t\teach(match(name!='ID'))\n\t),\n\tskipDuplicateMapInputs: true,\n\tskipDuplicateMapOutputs: true) ~> Select1\nSelect1 sink(allowSchemaDrift: true,\n\ttruncate: true,2),\n\tskipDuplicateMapOutputs: true) ~> sink1"
}
}
}
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)