Java封装 bigQuery 查询工具类,有用

共 10255字,需浏览 21分钟

 ·

2022-03-06 00:43

点击上方“程序员大白”,选择“星标”公众号

重磅干货,第一时间送达

缘起

最近在公司基于bigQuery开发埋点数据分析功能,所以总结一下自己封装的bigQuery查询工具类(网上关于bigQuery的文章比较少)

关于bigQuery的概念功能可以参考:

https://cloud.google.com/bigquery/docs?hl=zh-CN

在示例那包含了很多操作

示例中一段查询代码

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableResult;
import java.util.UUID;

public class SimpleApp {
  public static void main(String... args) throws Exception {
    BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
    QueryJobConfiguration queryConfig =
        QueryJobConfiguration.newBuilder(
                "SELECT commit, author, repo_name "
                    + "FROM `bigquery-public-data.github_repos.commits` "
                    + "WHERE subject like '%bigquery%' "
                    + "ORDER BY subject DESC LIMIT 10")
            // Use standard SQL syntax for queries.
            // See: https://cloud.google.com/bigquery/sql-reference/
            .setUseLegacySql(false)
            .build();

    // Create a job ID so that we can safely retry.
    JobId jobId = JobId.of(UUID.randomUUID().toString());
    Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());

    // Wait for the query to complete.
    queryJob = queryJob.waitFor();

    // Check for errors
    if (queryJob == null) {
      throw new RuntimeException("Job no longer exists");
    } else if (queryJob.getStatus().getError() != null) {
      // You can also look at queryJob.getStatus().getExecutionErrors() for all
      // errors, not just the latest one.
      throw new RuntimeException(queryJob.getStatus().getError().toString());
    }

    // Get the results.
    TableResult result = queryJob.getQueryResults();

    // Print all pages of the results.
    for (FieldValueList row : result.iterateAll()) {
      // String type
      String commit = row.get("commit").getStringValue();
      // Record type
      FieldValueList author = row.get("author").getRecordValue();
      String name = author.get("name").getStringValue();
      String email = author.get("email").getStringValue();
      // String Repeated type
      String repoName = row.get("repo_name").getRecordValue().get(0).getStringValue();
      System.out.printf(
          "Repo name: %s Author name: %s email: %s commit: %s\n", repoName, name, email, commit);
    }
  }
}

以上这段查询的代码,给我的感觉就是步骤都是类似的,都是创建一个JOB,等待查询,处理查询的结果集。如果我每写一个查询都要写这一大坨,那简直要恶心死。

其实以上的查询代码除了SQL和结果集的处理不一样,其他的都是一样,基于这一点来封装一个 bigQuery 查询工具类,想要达到的效果就是:我给你一段SQL,你给我处理好的结果。

Maven 依赖

首先需要引入 Maven 依赖,当然这个在 bigQuery 官方示例上是有的

<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>com.google.cloudgroupId>
      <artifactId>libraries-bomartifactId>
      <version>24.0.0version>
      <type>pomtype>
      <scope>importscope>
    dependency>
  dependencies>
dependencyManagement>

<dependencies>
  <dependency>
    <groupId>com.google.cloudgroupId>
    <artifactId>google-cloud-bigqueryartifactId>
  dependency>

设置身份验证

要运行客户端库,必须先设置身份验证,也就是说需要你的服务帐号密钥才能去连接和操作bigQuery,官方文档中也说到这点,他提供的方案是:设置环境变量 GOOGLE_APPLICATION_CREDENTIALS 向应用代码提供身份验证凭据

我觉得这种方式对于我们来说不太友好,我不能在服务器上设置环境变量,我们现在都是微服务,部署在k8s上,所以这种方案也不知道如何使用(会这种方式的小伙伴一定要告诉我怎么使用)

所以我的做法是将 身份验证凭据 JSON 文件加在项目 resource 里,通过流的方式读取凭据。

@Value(value = "classpath:netpop-e792a-data-analytics.json")
private Resource dataAnalyticsResource;

配置 BigQuery Bean

由上面那段查询案例可知,重要的一个 Bean 就是 BigQuery,所以把这个Bean 注册到IOC容器中。

@Configuration
public class BigQueryConfiguration {

    // 加载 身份验证凭据
    @Value(value = "classpath:netpop-e792a-data-analytics.json")
    private Resource dataAnalyticsResource;

    // 配置核心Bean
    @Bean
    BigQuery bigQuery() throws IOException {
        GoogleCredentials credentials = GoogleCredentials.fromStream(dataAnalyticsResource.getInputStream());
        BigQuery bigquery = BigQueryOptions.newBuilder().setCredentials(credentials).build().getService();
        return bigquery;
    }

    // 将bigQuery 分装工具类注册到IOC容器中
    @Bean
    BigQueryHelper bigQueryHelper(@Autowired BigQuery bigQuery) {
        return new BigQueryHelper(bigQuery);
    }

}

工具类


package groot.data.analysis.support;

import com.google.cloud.bigquery.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;

import java.lang.reflect.InvocationTargetException;
import java.math.BigDecimal;
import java.util.*;

/**
 * @Classname BigQueryHelper
 * @Description
 * @Created by wangchangjiu
 */

@Slf4j
public class BigQueryHelper {

    private BigQuery bigQuery;

    public BigQueryHelper() {
    }

    public BigQueryHelper(BigQuery bigQuery) {
        this.bigQuery = bigQuery;
    }


    /**
     * 获取列表 返回类型的字段不支持复杂类型
     *
     * @param sql
     * @param returnType
     * @param 
     * @return
     * @throws InterruptedException
     */

    public  List queryForList(String sql, Class returnType) throws InterruptedException {
        TableResult result = execute(sql);
        Map fieldMap = getStringFieldMap(result);
        List results = new ArrayList<>();
        result.iterateAll().forEach(row -> {
            T returnObj;
            try {
                returnObj = returnType.getDeclaredConstructor().newInstance();
            } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException ex) {
                throw new RuntimeException("reflect create object error :", ex);
            }
            ReflectionUtils.doWithFields(returnType, field -> {
                Field bigQueryField = fieldMap.get(field.getName());
                if (bigQueryField != null) {
                    FieldValue fieldValue = row.get(bigQueryField.getName());
                    if (bigQueryField.getType().getStandardType() == StandardSQLTypeName.STRUCT) {
                        throw new UnsupportedOperationException("unsupported returnType field include complex types");
                    }
                    field.setAccessible(true);
                    ReflectionUtils.setField(field, returnObj, resultWrapper(fieldValue, field.getType()));
                }
            });
            results.add(returnObj);
        });
        return results;
    }

    /**
     *  字段名和字段映射
     * @param result
     * @return
     */

    private Map getStringFieldMap(TableResult result) {
        FieldList fieldList = result.getSchema().getFields();
        Map fieldMap = new HashMap<>(fieldList.size());
        for (int i = 0; i < fieldList.size(); i++) {
            Field field = fieldList.get(i);
            fieldMap.put(field.getName(), field);
        }
        return fieldMap;
    }

    /**
     *  执行SQL 获取结果集
     * @param sql
     * @return
     * @throws InterruptedException
     */

    private TableResult execute(String sql) throws InterruptedException {
        Assert.notNull(sql, "SQL must not be null");
        QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(sql).setUseLegacySql(false).build();

        // Create a job ID so that we can safely retry.
        JobId jobId = JobId.of(UUID.randomUUID().toString());
        Job queryJob = bigQuery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());

        // Wait for the query to complete.
        queryJob = queryJob.waitFor();

        if (queryJob == null) {
            throw new RuntimeException("Job no longer exists");
        } else if (queryJob.getStatus().getError() != null) {
            throw new RuntimeException(queryJob.getStatus().getError().toString());
        }
        // Get the results.
        return queryJob.getQueryResults();
    }


    /**
     *  查询列表,实现 ResultSetExtractor 接口 自定义提取数据
     * @param sql
     * @param rse
     * @param 
     * @return
     * @throws InterruptedException
     */

    public  List queryForList(String sql, ResultSetExtractor rse) throws InterruptedException {
        TableResult tableResult = execute(sql);
        List results = new ArrayList<>();
        tableResult.iterateAll().forEach(row -> results.add(rse.extractData(row)));
        return results;
    }

    /**
     *  查询返回单个结果集
     * @param sql
     * @param returnType
     * @param 
     * @return
     * @throws InterruptedException
     */

    public  queryForSingleResult(String sql, Class returnType) throws InterruptedException {
        TableResult tableResult = execute(sql);
        if (tableResult.iterateAll().iterator().hasNext()) {
            // 只有一行
            FieldValueList fieldValues = tableResult.iterateAll().iterator().next();
            if (isBasicType(returnType)) {
                return (T) resultWrapper(fieldValues.get(0), returnType);
            } else {
                T returnObj;
                try {
                    returnObj = returnType.getDeclaredConstructor().newInstance();
                } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException ex) {
                    throw new RuntimeException("reflect create object error :", ex);
                }

                Map fieldMap = getStringFieldMap(tableResult);
                ReflectionUtils.doWithFields(returnType, field -> {
                    Field bigQueryField = fieldMap.get(field.getName());
                    if (bigQueryField != null) {
                        FieldValue fieldValue = fieldValues.get(bigQueryField.getName());
                        if (bigQueryField.getType().getStandardType() == StandardSQLTypeName.STRUCT) {
                            throw new UnsupportedOperationException("unsupported returnType field include complex types");
                        }
                        field.setAccessible(true);
                        ReflectionUtils.setField(field, returnObj, resultWrapper(fieldValue, field.getType()));
                    }
                });
                return returnObj;
            }
        }
        return null;
    }

    /**
     *  结果类型处理
     * @param fieldValue
     * @param returnType
     * @return
     */

    private Object resultWrapper(FieldValue fieldValue, Class returnType) {
        if (returnType == Boolean.class || returnType == boolean.class{
            return fieldValue.getBooleanValue();
        } else if (returnType == Long.class || returnType == long.class{
            return fieldValue.getLongValue();
        } else if (returnType == Double.class || returnType == double.class{
            return fieldValue.getDoubleValue();
        } else if (returnType == BigDecimal.class{
            return fieldValue.getNumericValue();
        } else if (returnType == String.class{
            return fieldValue.getStringValue();
        }
        return fieldValue.getValue();
    }

    /**
     *  判断是否是简单类型
     * @param returnType
     * @param 
     * @return
     */

    private  boolean isBasicType(Class returnType) {
        return returnType == String.class || returnType.isPrimitive()
                || returnType 
== Boolean.class || returnType == Byte.class
                || returnType 
== Integer.class || returnType == Long.class
                || returnType 
== Double.class || returnType == Short.class
                || returnType 
== Float.class || returnType == BigDecimal.class;
    }

}

这里对外主要提供了

// 获取列表 返回类型的字段不支持复杂类型
public  List queryForList(String sql, Class returnType) throws InterruptedException 

// 查询列表,实现 ResultSetExtractor 接口 自定义提取数据
public  List queryForList(String sql, ResultSetExtractor rse) throws InterruptedException

// 查询返回单个结果集
public  T queryForSingleResult(String sql, Class returnType) throws InterruptedException 

我这里主要的思想就是利用反射创建目标对象,将字段赋值进去

当然这里不支持返回类型是对象嵌套对象的形式,原因是那种比较复杂,而且我现在这里也没有这种场景。

还有就是这里没有支持分页等其他操作

使用工具类

使用的话就是注入 bigQueryHelper 工具类

以上就是简单的一个bigQuery 分装类,当然还可以进一步优化封装的更全一点。

来源:juejin.cn/post/7036337598025072653


13个你一定要知道的PyTorch特性

解读:为什么要做特征归一化/标准化?

一文搞懂 PyTorch 内部机制

张一鸣:每个逆袭的年轻人,都具备的底层能力




西[]


浏览 44
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报