欢迎来到徐庆高(Tea)的个人博客网站
磨难很爱我,一度将我连根拔起。从惊慌失措到心力交瘁,我孤身一人,但并不孤独无依。依赖那些依赖我的人,信任那些信任我的人,帮助那些给予我帮助的人。如果我愿意,可以分裂成无数面镜子,让他们看见我,就像看见自己。察言观色和模仿学习是我的领域。像每个深受创伤的人那样,最终,我学会了随遇而安。
当前位置: 日志文章 > 详细内容

使用Python与BigQuery进行交互的代码详解

2025年04月08日 Python
选择合适的 python 库在使用 bigquery 时,您可以根据自己的需求选择以下三个 python 库:bigquery dataframe:通过服务器端处理,支持 pandas 和 sciki

选择合适的 python 库

在使用 bigquery 时,您可以根据自己的需求选择以下三个 python 库:

  • bigquery dataframe:通过服务器端处理,支持 pandas 和 scikit-learn api,适合数据处理和机器学习任务。
  • pandas-gbq:客户端库,用于在 python 中读写 bigquery 数据,适合简单的数据处理和分析。
  • google-cloud-bigquery:google 维护的库,提供完整的 bigquery api 功能,适合复杂的数据管理和分析。

安装库

要使用这些库,您需要安装以下包:

pip install --upgrade pandas-gbq 'google-cloud-bigquery[bqstorage,pandas]'

运行查询

使用 googlesql 语法

以下示例展示了如何使用 pandas-gbq 和 google-cloud-bigquery 运行 googlesql 查询:

pandas-gbq

import pandas

sql = """
    select name
    from `bigquery-public-data.usa_names.usa_1910_current`
    where state = 'tx'
    limit 100
"""

# 使用标准 sql 查询
df = pandas.read_gbq(sql, dialect="standard")

# 指定项目 id
project_id = "your-project-id"
df = pandas.read_gbq(sql, project_id=project_id, dialect="standard")

google-cloud-bigquery

from google.cloud import bigquery

client = bigquery.client()
sql = """
    select name
    from `bigquery-public-data.usa_names.usa_1910_current`
    where state = 'tx'
    limit 100
"""

# 使用标准 sql 查询
df = client.query(sql).to_dataframe()

# 指定项目 id
project_id = "your-project-id"
df = client.query(sql, project=project_id).to_dataframe()

使用旧版 sql 语法

如果需要使用旧版 sql 语法,可以通过以下方式进行:

pandas-gbq

import pandas

sql = """
    select name
    from [bigquery-public-data:usa_names.usa_1910_current]
    where state = 'tx'
    limit 100
"""

df = pandas.read_gbq(sql, dialect="legacy")

google-cloud-bigquery

from google.cloud import bigquery

client = bigquery.client()
sql = """
    select name
    from [bigquery-public-data:usa_names.usa_1910_current]
    where state = 'tx'
    limit 100
"""
query_config = bigquery.queryjobconfig(use_legacy_sql=true)

df = client.query(sql, job_config=query_config).to_dataframe()

使用 bigquery storage api 加速数据下载

bigquery storage api 可以显著提高大型结果的下载速度。以下示例展示了如何使用此 api:

pandas-gbq

import pandas

sql = "select * from `bigquery-public-data.irs_990.irs_990_2012`"

# 使用 bigquery storage api 加速下载
df = pandas.read_gbq(sql, dialect="standard", use_bqstorage_api=true)

google-cloud-bigquery

from google.cloud import bigquery

client = bigquery.client()
sql = "select * from `bigquery-public-data.irs_990.irs_990_2012`"

# 如果 bigquery storage api 已启用,则自动使用
df = client.query(sql).to_dataframe()

配置查询

参数化查询

以下示例展示了如何使用参数化查询:

pandas-gbq

import pandas

sql = """
    select name
    from `bigquery-public-data.usa_names.usa_1910_current`
    where state = @state
    limit @limit
"""
query_config = {
    "query": {
        "parametermode": "named",
        "queryparameters": [
            {
                "name": "state",
                "parametertype": {"type": "string"},
                "parametervalue": {"value": "tx"},
            },
            {
                "name": "limit",
                "parametertype": {"type": "integer"},
                "parametervalue": {"value": 100},
            },
        ],
    }
}

df = pandas.read_gbq(sql, configuration=query_config)

google-cloud-bigquery

from google.cloud import bigquery

client = bigquery.client()
sql = """
    select name
    from `bigquery-public-data.usa_names.usa_1910_current`
    where state = @state
    limit @limit
"""
query_config = bigquery.queryjobconfig(
    query_parameters=[
        bigquery.scalarqueryparameter("state", "string", "tx"),
        bigquery.scalarqueryparameter("limit", "integer", 100),
    ]
)

df = client.query(sql, job_config=query_config).to_dataframe()

将 pandas dataframe 加载到 bigquery 表中

以下示例展示了如何将 pandas dataframe 加载到 bigquery 表中:

pandas-gbq

import pandas

df = pandas.dataframe(
    {
        "my_string": ["a", "b", "c"],
        "my_int64": [1, 2, 3],
        "my_float64": [4.0, 5.0, 6.0],
        "my_timestamp": [
            pandas.timestamp("1998-09-04t16:03:14"),
            pandas.timestamp("2010-09-13t12:03:45"),
            pandas.timestamp("2015-10-02t16:00:00"),
        ],
    }
)
table_id = "my_dataset.new_table"

df.to_gbq(table_id)

google-cloud-bigquery

from google.cloud import bigquery
import pandas

df = pandas.dataframe(
    {
        "my_string": ["a", "b", "c"],
        "my_int64": [1, 2, 3],
        "my_float64": [4.0, 5.0, 6.0],
        "my_timestamp": [
            pandas.timestamp("1998-09-04t16:03:14"),
            pandas.timestamp("2010-09-13t12:03:45"),
            pandas.timestamp("2015-10-02t16:00:00"),
        ],
    }
)
client = bigquery.client()
table_id = "my_dataset.new_table"

# 确保正确的数据类型
job_config = bigquery.loadjobconfig(
    schema=[
        bigquery.schemafield("my_string", "string"),
    ]
)

job = client.load_table_from_dataframe(df, table_id, job_config=job_config)

# 等待加载完成
job.result()

pandas-gbq 的局限性

  • 数据集管理:不支持创建、更新或删除数据集。
  • 数据格式支持:仅支持 csv 格式,不支持嵌套值或数组值。
  • 表管理:不支持列出表、复制表或删除表。
  • 数据导出:不支持直接导出数据到 cloud storage。

解决连接池错误

如果遇到连接池错误,可以通过以下方式增加连接池大小:

import requests

client = bigquery.client()
adapter = requests.adapters.httpadapter(pool_connections=128, pool_maxsize=128, max_retries=3)
client._http.mount("https://", adapter)
client._http._auth_request.session.mount("https://", adapter)

以上就是使用python与bigquery进行交互的代码详解的详细内容,更多关于python与bigquery交互的资料请关注代码网其它相关文章!