背景
在工作过程中要定期的更新excel表的信息,每个星期都要去查询strarocks的数据导出结果到excel,俗话说:“不会偷懒的运维不是好运维”,于是写了python小程序解决这个重复的工作,设置定时任务,直接去服务器下载导出的excel表格即可。
代码
代码的逻辑简单介绍:将要执行的sql以名称进行区分保存并放到目录:sqlfiledir,设置结果存放路径:./…/outputdir/。python先查询数据,然后以sql文件名前缀为excle名称保存,最后移动到指定目录。
本地调试界面:
# -*- coding: utf-8 -*- # @author : zjh # @time : 2023-12-27 # @description: 定时跑数据保存到excel import os import shutil import pandas as pd import pymysql import openpyxl import datetime class starrocksexporter(object): def __init__(self, host, port, database, user, password, query, srcdir, destdir,filename): self.host = host self.port = port self.database = database self.user = user self.password = password self.query = query self.srcdir = srcdir self.destdir = destdir self.filename = filename self.writer = pd.excelwriter(filename+str('.xlsx')) def export_to_excel(self): df = pd.read_sql(self.query, self.engine) # print(df) df.to_excel(self.writer, sheet_name=self.filename, index=false) self.writer.save() def move_to_dest(self): if not os.path.isdir(self.destdir): self.destdir = os.mkdir(self.destdir) file_list = os.listdir(self.srcdir) for file in file_list: #print(file) #print(file.split('.')[0]) try: #print(file.split('.')[1]) fiel_str = file.split('.')[1] if fiel_str == 'xlsx': shutil.move(str(self.srcdir) + file, str(self.destdir) + file) except exception: print("没有后缀的文件:",file) #shutil.move(str(self.srcdir) + file, str(self.destdir) + file) def execute(self): with pymysql.connect(host=self.host,port=self.port,database=self.database,user=self.user,password=self.password) as engine: self.engine = engine self.query_star_rock(self.query) self.export_to_excel() self.move_to_dest() def query_star_rock(self, query): cursor = self.engine.cursor() cursor.execute(query) results = cursor.fetchall() return results def get_user(self, user): pass def get_password(self, password): pass if __name__ == '__main__': destdir = './../outputdir/' srcdir = './' folder_path = './../sqlfiledir/' file_list = os.listdir(folder_path) for sqlfile in file_list: file_path=str(folder_path)+str(sqlfile) with open(file_path, "r", encoding='utf-8') as f: sql = f.read() srfilename = sqlfile.split('.')[0] exporter = starrocksexporter('192.168.10.11', 19030, 'manager', 'sys_ro','sdagfsdg!@#saf134', sql,srcdir,destdir,srfilename) exporter.execute()
mysql的导出也可以用该脚本,因为starrocks/doris连接都是用的mysql驱动,兼容mysql语法。
方法扩展
下面小编为大家介绍一下python如何基于starrocks库连接查询starrocks数据库,需要的可以参考一下
sqlalchemy 用法
要使用 sqlalchemy 连接到 starrocks,连接字符串如下所示:
starrocks://<user>:<password>@<host>:<port>/<catalog>.<database>
import pandas as pd from sqlalchemy import create_engine, text # 设置 pandas 显示选项以显示所有列 pd.set_option('display.max_columns', none) pd.set_option('display.max_rows', none) pd.set_option('display.max_colwidth', none) """ 'starrocks://<user>:<password>@<host>:<port>/<catalog>.<database>' """ def query_user_data(user_name): # 连接到starrocks数据库 engine = create_engine('starrocks://test_user:test_user123@192.168.1.2:9030/sr_db') # 执行查询并获取结果 with engine.connect() as connection: sql_query = "select data from sr_db.user where user_name=" + user_name result = connection.execute(text(sql_query)).fetchall() # 将查询结果转换为 pandas dataframe ret_df = pd.dataframe(result) return ret_df # main function if __name__ == '__main__': console = console() user_name = "'tom'" df = query_user_data(user_name ) # 如果 dataframe 不为空,显示 if df is not none and not df.empty: print(df) else: print("数据为空")
到此这篇关于python定时查询starrocks数据库并将结果保存在excel的文章就介绍到这了,更多相关python定时查询starrocks内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论