Spark-如何获取随机的唯一行

问题描述

我需要一种从唯一的数据集中获取x数量随机行的方法。我尝试了数据集类的sample方法,但有时会选择重复的行。

数据集的示例方法

https://spark.apache.org/docs/2.2.1/api/java/org/apache/spark/sql/Dataset.html#sample-boolean-double-

解决方法

具有withReplacement =>'false'的示例函数将始终选择不同的行public class Singleton { /** * Classes are already lazily initialized,on first getSingleton() call */ private static final Singleton instance = new Singleton(); private Singleton() {} public static Singleton getSingleton() { return instance; } public Map<String,MyType> getMyTypeMap(String configRootDir){ return readAllConfigsFromLocalDisk(configRootDir,"my-type-file-pattern",MyTypeConf.class,MyTypeConf::getMyTypeList,MyType::getId); } public Map<String,MyOtherType> getMyOtherTypeMap(String configRootDir){ return readAllConfigsFromLocalDisk(configRootDir,"my-other-type-file-pattern",MyOtherTypeConf.class,MyOtherTypeConf::getMyOtherTypeList,MyOtherType::getId); } /** * Get all files in config root directory and parse one by one * @param configRootDir Root directory for configurations * @param filePattern File pattern * @param confType Configuration type (MyTypeConf.class or MyOtherTypeConf.class) * @param getList Configuration type specific list accessor method * @param getId Result type specific Id accessor for the map key */ private <T,C> Map<String,T> readAllConfigsFromLocalDisk( String configRootDir,String filePattern,Class<C> confType,Function<C,List<T>> getList,Function<T,String> getId) { try(Stream<Path> walk = Files.walk(Paths.get(configRootDir))) { Pattern pattern = Pattern.compile(filePattern); return getLocalFilePaths(walk,pattern) .flatMap(p -> this.parseConfigFile(p,confType,getList)) .collect(Collectors.toMap(getId,Function.identity())); } catch(IOException|UncheckedIOException ex) { logger.error("Specified config root directory not found.",ex); return Collections.emptyMap(); } } /** * Read a given configuration file from local disk and map to specified config type * * @param configFile Path to config file on local disk * @param configType Configuration type (MyTypeConf.class or MyOtherTypeConf.class) * @param getList Configuration type specific list accessor method */ private <T,C> Stream<T> parseConfigFile( Path configFile,Class<C> configType,List<T>> getList) { try { C conf=Core.getMapper().readValue(Files.readString(configFile),configType); List<T> tRefs = getList.apply(conf); return tRefs.stream(); } catch(IOException ex) { throw new UncheckedIOException(ex); } } /** * Get file paths of all matching files exist in configured streaming directory * and sub folders from disk. * * @param walk Stream of paths in config root directory. * @param pattern Pattern to math when discovering files. * @return Stream of Path objects for all files matching the pattern. */ private Stream<Path> getLocalFilePaths(Stream<Path> walk,Pattern pattern) { return walk.filter(Files::isRegularFile).filter(p -> { String fileName = p.getFileName().toString(); Matcher matcher = pattern.matcher(fileName); return matcher.matches(); }); } }

sample(带替换的布尔值,双分数)

请考虑以下示例:

其中withReplacement =>'true'给出了可以通过计数验证的重复行,而withReplacement =>'false'却没有。

df1.sample(false,0.1).show()
,

您应将 sample 函数与false替换一起使用,例如,可以使用:

# A tibble: 9 x 3
# Groups:   x [3]
  x     y         z
  <fct> <fct> <int>
1 r     i        95
2 r     c        92
3 r     a        88
4 s     g        94
5 s     g        92
6 s     f        92
7 t     j       100
8 t     d        93
9 t     i        81

但这不是,可以保证准确提供给定数据集总数的一部分。 为此,请通过 sample 函数获取采样数据后,获取X采样数据实体。