Flink如何做维表关联?

使用

RichAsyncFunction 加 CacheBuilder


CacheBuilder.newBuilder()
        //最多存储10000条
        .maximumSize(10000)
        //过期时间为1分钟
        .expireAfterWrite(60, TimeUnit.SECONDS)
        .build();
public class LRU extends RichAsyncFunction<String,Order> {
    private static final Logger LOGGER = LoggerFactory.getLogger(LRU.class);
    String table = "info";
    Cache<String, String> cache = null;
    private HBaseClient client = null;
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        //创建hbase客户端
        client = new HBaseClient("127.0.0.1","7071");
        cache = CacheBuilder.newBuilder()
                //最多存储10000条
                .maximumSize(10000)
                //过期时间为1分钟
                .expireAfterWrite(60, TimeUnit.SECONDS)
                .build();
    }
    @Override
    public void asyncInvoke(String input, ResultFuture<Order> resultFuture) throws Exception {
        JSONObject jsonObject = JSONObject.parSEObject(input);
        Integer cityId = jsonObject.getInteger("city_id");
        String userName = jsonObject.getString("user_name");
        String items = jsonObject.getString("items");
        //读缓存
        String cacheCityName = cache.getIfPresent(cityId);
        //如果缓存获取失败再从hbase获取维度数据
        if(cacheCityName != null){
            Order order = new Order();
            order.setCityId(cityId);
            order.setItems(items);
            order.setUserName(userName);
            order.setCityName(cacheCityName);
            resultFuture.complete(Collections.singleton(order));
        }else {
            client.get(new GetRequest(table,String.valueOf(cityId))).addCallback((Callback<String, ArrayList<keyvalue>>) arg -> {
                for (keyvalue kv : arg) {
                    String value = new String(kv.value());
                    Order order = new Order();
                    order.setCityId(cityId);
                    order.setItems(items);
                    order.setUserName(userName);
                    order.setCityName(value);
                    resultFuture.complete(Collections.singleton(order));
                    cache.put(String.valueOf(cityId), value);
                }
                return null;
            });
        }
    }
}

相关文章

Flink-core小总结1.实时计算和离线计算1.1离线计算离线计算的...
2022年7月26日,Taier1.2版本正式发布!本次版本发布更新功能...
关于Flink相关的概念性东西就不说了,网上都有,官网也很详尽...
最近准备用flink对之前项目进行重构,这是一个有挑战(但我很...
Thispostoriginallyappearedonthe ApacheFlinkblog.Itwasre...
Flink配置文件对于管理员来说,差不多经常调整的就只有conf下...