连接mysql数据库并插入数据是python中常见的任务,通常可以通过mysql-connector-python库来实现。下面是一个简单的示例脚本,展示了如何连接mysql数据库并插入数据:
1.安装mysql-connector-python库
如果你还没有安装这个库,可以通过pip安装:
pip install mysql-connector-python
2.编写python脚本
下面是一个示例脚本,它连接到mysql数据库,创建一个新表(如果不存在),然后插入一些数据。
import mysql.connector
from mysql.connector import error
try:
# 连接到mysql数据库
connection = mysql.connector.connect(
host='localhost', # 数据库服务器地址
user='your_username', # 数据库用户名
password='your_password', # 数据库密码
database='your_database_name' # 数据库名
)
if connection.is_connected():
print("connected to mysql database")
# 创建一个cursor对象
cursor = connection.cursor()
# 创建表
create_table_query = """
create table if not exists employees (
id int auto_increment primary key,
name varchar(255) not null,
salary int,
department varchar(255)
);
"""
cursor.execute(create_table_query)
print("table created or already exists")
# 插入数据
insert_query = """
insert into employees (name, salary, department)
values (%s, %s, %s);
"""
data = ("john doe", 70000, "finance")
cursor.execute(insert_query, data)
connection.commit()
print("data inserted successfully")
except error as e:
print("error while connecting to mysql", e)
finally:
# 关闭cursor和连接
if connection.is_connected():
cursor.close()
connection.close()
print("mysql connection is closed")
3.运行脚本
将上述脚本保存为一个.py文件,然后运行它。确保你已经正确设置了数据库的连接参数(如主机、用户名、密码和数据库名)。
这个脚本首先尝试连接到mysql数据库,如果连接成功,它会检查是否存在一个名为employees的表,如果不存在则创建这个表。然后,脚本会向这个表中插入一行数据。最后,它会关闭数据库连接。
4.方法补充
python实现连接mysql脚本
完整代码:
#encoding=gbk
import sys
import mysqldb
conn=mysqldb.connection('127.0.0.1','root','123456','job',charset='gbk')
cur=conn.cursor()
cur.execute("select * from demo_jobs_store")
conn.commit()
conn.close()
cur.scroll(0)
row1=cur.fetchone()
print row1[3]python脚本连接mysql数据库的方法
下面介绍了如何利用python的pymysql库进行数据库登录、执行sql查询并获取所有数据的过程,包括连接数据库、执行sql语句、获取查询结果及关闭连接等关键步骤
完整代码:
import pymysql ## 登录账号 host = "host_name" user = "user_name" password = "password" database = "database_name" port = port conn = pymysql.connect(host, user, passwd, db, port) ## 登录数据库,返回连接 cursor = conn.cursor() sql = "select var1, ..., varn from table_name where ..." cursor.execute(sql) data = cursor.fetchall() ##data为以元组为元素的元组 conn.commit() cursor.close() conn.close() ## 关闭连接
python连接mysql工具类+进行数据驱动+配置模块
即:将python连接mysql相关的代码封装起来,形成一个工具类,在需要使用的时候调用
准备工作:1、key_demo——2、cases|tool——3、cases/case.py|tool/mydb.py
1、mydb.py工具类
# -*- coding: utf-8 -*-
# @author : hxy
# @time : 2022/1/18 15:20
# @function:
import pymysql
class my_db:
'''
动作类:获取数据连接,连接ip,端口,账号密码。。
'''
# 构造函数
def __init__(self):
try:
self.dbconn = pymysql.connect(host='127.0.0.1', port=3306, user='root', password='root',
database='test_cases', charset='utf8')
except exception as e:
print('初始化数据库连接失败:%s' % e)
def close(self):
self.dbconn.close()
# query=查询语句
def select_record(self, query):
# 查询数据
print('query:%s' % query)
try:
# 建立游标
db_cursor = self.dbconn.cursor()
db_cursor.execute(query)
result = db_cursor.fetchall()
return result
except exception as e:
print('数据库查询数据失败:%s' % e)
db_cursor.close()
exit()
# 插入
def execute_insert(self, query):
print('query:%s' % query)
try:
# 建立游标
db_cursor = self.dbconn.cursor()
db_cursor.execute(query)
db_cursor.execute('commit')
return true
except exception as e:
print('数据库插入数据失败:%s' % e)
# 事务回滚
db_cursor.execute('rollback')
db_cursor.close()
exit()2、case.py数据驱动
# -*- coding: utf-8 -*-
# @author : hxy
# @time : 2022/1/7 15:07
# @function:
# 针对数据库中的数据做数据驱动
import time
import unittest
from ddt import ddt, data, unpack
from key_demo.tool.mydb import my_db
# 必须要实例化,不然会报typeerror: select_record() missing 1 required positional argument: 'query'
testdb = my_db()
now = time.strftime("%y-%m-%d %h:%m:%s", time.localtime())
@ddt
class case(unittest.testcase):
# @data(*my_db().select_record('select weaid,success from weather'))
@data(*testdb.select_record('select weaid,success from weather'))
@unpack
def test_1(self, weaid, success):
print(weaid, success)
testdb.execute_insert('insert into weather(weaid,success,cre_time) values ("2","2","%s")' % now)
if __name__ == '__main__':
unittest.main()这样写的插入语句会被调用多次,要注意@data中执行的操作具体执行次数
换了一个写法,先插入,再查询
# -*- coding: utf-8 -*-
# @author : hxy
# @time : 2022/1/7 15:07
# @function:
# 针对数据库中的数据做数据驱动
import time
import unittest
from ddt import ddt, data, unpack
from key_demo.tool.mydb import my_db
# 必须要实例化,不然会报typeerror: select_record() missing 1 required positional argument: 'query'
testdb = my_db()
now = time.strftime("%y-%m-%d %h:%m:%s", time.localtime())
@ddt
class case(unittest.testcase):
# @data(*my_db().select_record('select weaid,success from weather'))
@data(testdb.execute_insert('insert into weather(weaid,success,cre_time)values("2","2","%s")' % now))
# @unpack
def test_1(self,t):
print(t)
res = testdb.select_record('select weaid,success from weather')
print(res)
if __name__ == '__main__':
unittest.main()代码编写的时候有几个注意事项:
1、只执行一条语句,返回结果也只有一个,不需要@unpack解析
2、mydb.py中定义有返回值,还有ddt中定义的语法return func(self, *args, **kwargs),不能缺少参数
请确保在运行脚本之前,你的mysql服务器正在运行,并且你已经正确配置了数据库的访问权限。
到此这篇关于python脚本实现mysql数据库连接并插入数据的文章就介绍到这了,更多相关python脚本连接mysql内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论