一、安装库
python连接mysql、postgresql数据库需要导入相关的模块,分别是”pymysql“和”psycopg2“模块,我们可以在pycharm工具中直接搜索安装对应的库然后导入即可,两个数据库连接方式基本一致,所以只拿其中一个作为演示。

如果工具安装失败(有可能是网络原因),也可以使用pip命令进行安装
>>> pip3 install pymysql,psycopg2 >>> #多环境用户需要主要pip版本
如果你的系统不支持 pip 命令,可以使用以下方式安装:
1、使用 git 命令下载安装包安装(你也可以手动下载):
>>> git clone https://github.com/pymysql/pymysql >>> cd pymysql/ >>> python3 setup.py install
2、如果需要制定版本号,可以使用 curl 命令来安装:
>>> # x.x 为 pymysql 的版本号 >>> curl -l https://github.com/pymysql/pymysql/tarball/pymysql-x.x | tar xz >>> cd pymysql* >>> python3 setup.py install >>> # 现在你可以删除 pymysql* 目录
二、连接数据库
连接数据库会用到的参数:
database – 数据库名称 user – 用户名 password – 密码 host – 服务器地址 (如果不提供默认连接unix socket) port – 连接端口 (默认5432)
用pymysql模块中的connect连接数据库,同时用cursor()函数创建游标,用于接收返回的结果。


这里返回了一个游标实例对象,说明你已经连接成功了。
三、数据库操作
接下来我们可以使用 .execute 对数据库进行一系列的增删改查操作

运行成功后就可以在电脑上的数据库中查找到相应的变化啦,最后记得关闭数据库的连接,以下是查询结果:

四、代码奉上
我是通过读取配置文件读取数据库信息进行连接的,一并付上代码,有需要的可以参考,不需要的直接忽略
#setting.ini配置文件 [mysql] # mysql配置 mysql_host = 39.101.000.0 mysql_port = 5432 mysql_user = test mysql_passwd = xxxx mysql_db = xxxx_dev [postgresql] # postgresql配置 pg_host = 39.101.000.0 pg_port = 5432 pg_user = test pg_passwd = xxxxx pg_db = xxxxxx_dev
# load_ini.py文件,读取ini配置文件方法
def load_ini(file_path):
log.info("加载 {} 文件......".format(file_path))
config = myconfigparser()
config.read(file_path, encoding="utf-8")
data = dict(config._sections)
return data
#run.py文件
#导入所需包
import os
import pymysql
import psycopg2
from common.load_ini import load_ini
#拼接路径以列表的形式进行读取配置文件
base_path = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
data_file_path = os.path.join(base_path, "config", "setting.ini")
mysql_data = load_ini(data_file_path)['mysql']
pg_data = load_ini(data_file_path)['postgresql']
#对库信息进行赋值,此方法只作为演示,大佬可自行编写最优写法
mysql_host = mysql_data["mysql_host"]
mysql_port = int(mysql_data["mysql_port"])
mysql_username = mysql_data["mysql_user"]
mysql_password = mysql_data["mysql_passwd"]
mysql_database = mysql_data["mysql_db"]
pg_host = pg_data["pg_host"]
pg_port = int(pg_data["pg_port"])
pg_username = pg_data["pg_user"]
pg_password = pg_data["pg_passwd"]
pg_database = pg_data["pg_db"]
#连接mysql数据库函数
def my_sql_database():
#连库相关信息
mydb = pymysql.connect(
host=mysql_host,
port=mysql_port,
user=mysql_username,
passwd=mysql_password,
db=mysql_database,
charset='utf8'
)
mycursor = mydb.cursor()
# 写入sql语句对数据库进行增删改操作即可
sql = "select * from jg_staff where staff_name = '测试1华北项目组'"
# mycursor.execute("show databases")
mycursor.execute(sql)
print(mycursor.fetchall())
mydb.commit() # 查询时不需需调用,此方法用于提交当前事务。如果不调用这个方法,无论做了什么修改,自从上次调用#commit()是不可见的
# 关闭数据库
mycursor.close() # 关闭游标
mycursor.close() # 关闭数据库连接
#连接postgresql数据库函数
def pg_sql_database():
#postgresql数据库连接方法和mysql方法基本一致
conn = psycopg2.connect(
host=pg_host,
port=pg_port,
user=pg_username,
password=pg_password,
database=pg_database
)
cur = conn.cursor()
sql = "select area from b_lands_attribute_2021 bla where id = '20545521'"
cur.execute(sql)
print(cur.fetchall()) #打印结果
conn.commit() # 查询时不需需调用,此方法用于提交当前事务。如果不调用这个方法,无论做了什么修改,自从上次调用#commit()是不可见的
# 关闭数据库
cur.close() # 关闭游标
cur.close() # 关闭数据库连接
def my_sql_insert(sql_case):
"""
往sql插入数据
:param sql_case:
:return:
"""
mydb = pymysql.connect(
host=mysql_host,
port=mysql_port,
user=mysql_username,
passwd=mysql_password,
db=mysql_database,
charset='utf8mb4'
)
mycursor = mydb.cursor()
mycursor.execute(sql_case)
mydb.commit()
mydb.close()
def mysql_select(sql_case):
"""
查询sql以列表形式输出
:param sql_case:
:return:
"""
try:
connection = pymysql.connect(
host=mysql_host,
port=mysql_port,
user=mysql_username,
passwd=mysql_password,
db=mysql_database,
charset='utf8mb4'
)
with connection.cursor() as cursor:
cursor.execute(sql_case)
results = []
for row in cursor.fetchall():
# 将包含decimal的列转换为浮点数
processed_row = [float(item) if isinstance(item, decimal) else item for item in row]
results.append(processed_row)
except mysqlerror as e:
print(f"error: {e}")
results = []
finally:
if connection:
connection.close()
return results
def my_sql_select(sql_select):
"""
查询sql以数据类型输出
:param sql_select:
:return:
"""
mydb = pymysql.connect(
host=mysql_host,
port=mysql_port,
user=mysql_username,
passwd=mysql_password,
db=mysql_database,
charset='utf8mb4'
)
mycursor = mydb.cursor()
mycursor.execute(sql_select)
nested_list = [list(row) for row in mycursor.fetchall()]
mydb.close()
return nested_list
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。
发表评论