
这是一个用 go 语言开发的通达信数据导出工具,可以将通达信的本地数据导出为多种格式,方便用户进行数据分析和处理。
主要功能
- 支持多种数据类型导出:
- 日线数据
- 5分钟线数据
- 1分钟线数据
- 支持多种导出格式:
- csv 格式
- sqlite 数据库
- excel 文件
- postgres数据库连接导出
特点
- 图形化界面,操作简单直观
- 支持增量导出,避免重复处理
- 可配置数据源和导出路径
- 实时显示处理进度
- 支持批量处理大量数据
使用说明
- 设置通达信数据路径
- 选择要导出的数据类型
- 选择导出格式
- 点击导出按钮开始处理
技术栈
- go 语言开发
- fyne gui 框架
- sqlite/csv/excel/postgres 数据处理
配置要求
- 需要本地安装通达信软件
- 需要有通达信的历史数据
注意事项
- 首次使用需要配置通达信数据路径
- 建议定期备份导出的数据
- 大量数据导出可能需要较长时间

所有测试工作都在在mac中进行,源码可以在多平台运行.程序目录结构

main.go
package main
import (
"fmt"
"os"
"path/filepath"
"strconv"
"tdx_exporter/config"
"tdx_exporter/tdx"
"time"
_ "github.com/lib/pq"
"image/color"
"fyne.io/fyne/v2"
"fyne.io/fyne/v2/app"
"fyne.io/fyne/v2/container"
"fyne.io/fyne/v2/dialog"
"fyne.io/fyne/v2/layout"
"fyne.io/fyne/v2/theme"
"fyne.io/fyne/v2/widget"
)
var (
daygroup *widget.checkgroup
mingroup *widget.checkgroup
)
// 创建自定义主题
type mytheme struct {
fyne.theme
}
func (m mytheme) color(name fyne.themecolorname, variant fyne.themevariant) color.color {
if name == theme.colornameforeground {
return color.nrgba{r: 0, g: 0, b: 0, a: 255} // 黑色
}
return theme.defaulttheme().color(name, variant)
}
func main() {
// 设置中文编码
os.setenv("lang", "zh_cn.utf-8")
myapp := app.new()
// 设置中文字体
myapp.settings().settheme(&mytheme{theme.defaulttheme()})
mywindow := myapp.newwindow("通达信数据工具")
// 创建数据类型选择容器,分组显示
daygroup = widget.newcheckgroup([]string{"日据"}, nil)
daygroup.setselected([]string{"日线数据"})
mingroup = widget.newcheckgroup([]string{"1分钟线", "5分钟线"}, nil)
// 然后创建格式选择
formatselect := widget.newselect([]string{"postgres", "csv", "sqlite", "excel"}, func(value string) {
if value == "sqlite" || value == "postgres" {
// 默认选中所有选项,但保持可选状态
if len(daygroup.selected) == 0 {
daygroup.setselected([]string{"日线数据"})
}
if len(mingroup.selected) == 0 {
mingroup.setselected([]string{"1分钟线", "5分钟线"})
}
// 移除禁用代码,保持控件可选
daygroup.enable()
mingroup.enable()
} else {
// csv或excel格式时保持原有逻辑
daygroup.enable()
mingroup.enable()
}
})
formatselect.setselected("postgres")
// 创建结果显示区域 - 修改为更大的显示区域
resultarea := widget.newmultilineentry()
resultarea.disable()
resultarea.setplaceholder("导出结果将在这里显示")
resultarea.resize(fyne.newsize(580, 300)) // 设置更大的尺寸
resultarea.setminrowsvisible(15) // 显示更多行
// 创建按钮
settingsbtn := widget.newbuttonwithicon("设置", theme.settingsicon(), func() {
showsettingsdialog(mywindow)
})
var exportbtn, updatebtn *widget.button
// 创建左侧布局
leftpanel := container.newvbox(
widget.newlabel("导出格式:"),
formatselect,
widget.newseparator(), // 添加隔线
widget.newlabel("数据类型选择:"),
daygroup,
mingroup,
)
exportbtn = widget.newbuttonwithicon("导出", theme.documentsaveicon(), func() {
exportbtn.disable()
settingsbtn.disable()
formatselect.disable()
startexport(mywindow, formatselect.selected, func() {
exportbtn.enable()
settingsbtn.enable()
formatselect.enable()
})
})
updatebtn = widget.newbuttonwithicon("更新", theme.viewrefreshicon(), func() {
updatebtn.disable()
settingsbtn.disable()
exportbtn.disable()
formatselect.disable()
startupdate(mywindow, func() {
updatebtn.enable()
settingsbtn.enable()
exportbtn.enable()
formatselect.enable()
})
})
// 创建按钮布局
buttons := container.newhbox(
layout.newspacer(), // 添加弹性空间使按钮居中
settingsbtn,
exportbtn,
updatebtn,
layout.newspacer(),
)
// 创建 memo 控件
memo := widget.newmultilineentry()
memo.disable() // 设置为只读
memo.setplaceholder("提示信息将在这里显示")
memo.setminrowsvisible(5) // 设置最小显示行数
memo.settext("欢迎使用通达信数据工具\n请选择要导出的数据类型和格式")
// 创建主布局
content := container.newborder(
buttons,
nil,
container.newpadded(leftpanel),
nil,
memo,
)
mywindow.setcontent(content)
mywindow.resize(fyne.newsize(800, 400))
mywindow.showandrun()
}
func showsettingsdialog(window fyne.window) {
settings, err := config.loadsettings()
if err != nil {
dialog.showerror(err, window)
return
}
// 基本设置页面
tdxpath := widget.newentry()
tdxpath.settext(settings.tdxpath)
tdxpath.setplaceholder("请输入通达信数据目路径")
exportpath := widget.newentry()
exportpath.settext(settings.exportpath)
exportpath.setplaceholder("请输入导出数据保存路径")
// 数据库设置页面
dbhost := widget.newentry()
dbhost.settext(settings.dbconfig.host)
dbhost.setplaceholder("数据库主机地址")
dbport := widget.newentry()
dbport.settext(fmt.sprintf("%d", settings.dbconfig.port))
dbport.setplaceholder("端口号")
dbuser := widget.newentry()
dbuser.settext(settings.dbconfig.user)
dbuser.setplaceholder("用户名")
dbpassword := widget.newpasswordentry()
dbpassword.settext(settings.dbconfig.password)
dbpassword.setplaceholder("密码")
dbname := widget.newentry()
dbname.settext(settings.dbconfig.dbname)
dbname.setplaceholder("数据库名")
testconnbtn := widget.newbutton("测试连接", func() {
// ... 测试连接代码 ...
})
// 修改数据库设置页面布局
dbsettings := container.newvbox(
// 添加测试连接按钮到顶部
container.newhbox(
layout.newspacer(),
testconnbtn,
layout.newspacer(),
),
widget.newseparator(), // 分隔线
container.newgridwithcolumns(2,
container.newvbox(
widget.newlabel("主机地址:"),
dbhost,
widget.newlabel("用户名:"),
dbuser,
widget.newlabel("数据库名:"),
dbname,
),
container.newvbox(
widget.newlabel("端口号:"),
dbport,
widget.newlabel("密码:"),
dbpassword,
),
),
)
// 修改基本设置页面布局
basicsettings := container.newvbox(
widget.newlabel("通达信数据路径:"),
container.newpadded(tdxpath),
widget.newseparator(),
widget.newlabel("导出数据保存路径:"),
container.newpadded(exportpath),
)
// 创建标签页
tabs := container.newapptabs(
container.newtabitem("基本设置", container.newpadded(basicsettings)),
container.newtabitem("数据库设置", container.newpadded(dbsettings)),
)
tabs.settablocation(container.tablocationtop)
dialog := dialog.newcustomconfirm(
"参数设置",
"确定",
"取消",
tabs,
func(ok bool) {
if !ok {
return
}
port, _ := strconv.atoi(dbport.text)
newsettings := &config.settings{
tdxpath: tdxpath.text,
exportpath: exportpath.text,
dbconfig: config.dbconfig{
host: dbhost.text,
port: port,
user: dbuser.text,
password: dbpassword.text,
dbname: dbname.text,
},
exportpaths: settings.exportpaths,
}
if err := config.savesettings(newsettings); err != nil {
dialog.showerror(err, window)
return
}
dialog.showinformation("成功", "设置已保存", window)
},
window,
)
// 设置对话框大小
dialog.resize(fyne.newsize(500, 400))
dialog.show()
}
// 修改 startexport 函数
func startexport(window fyne.window, format string, oncomplete func()) {
settings, err := config.loadsettings()
if err != nil {
dialog.showerror(err, window)
oncomplete()
return
}
if settings.tdxpath == "" {
dialog.showerror(fmt.errorf("请先在参数设置中设置通达信数据路径"), window)
oncomplete()
return
}
go func() {
processor := tdx.newdataprocessor(settings.tdxpath)
lastexportinfo, haslastexport := settings.getlastexportinfo(format)
exportopts := tdx.exportoptions{
isincremental: haslastexport,
lastexporttime: lastexportinfo.lasttime,
datatypes: tdx.datatypes{
day: contains(daygroup.selected, "日线数据"),
min1: contains(mingroup.selected, "1分钟线"),
min5: contains(mingroup.selected, "5分钟线"),
},
targetdir: settings.exportpath,
}
var exporterr error
switch format {
case "postgres":
dbconfig := tdx.dbconfig{
host: settings.dbconfig.host,
port: settings.dbconfig.port,
user: settings.dbconfig.user,
password: settings.dbconfig.password,
dbname: settings.dbconfig.dbname,
}
exporterr = processor.exporttopostgres(dbconfig, exportopts)
case "csv":
exporterr = processor.exporttocsv(settings.exportpath, exportopts)
case "sqlite":
outputpath := filepath.join(settings.exportpath, "export.db")
exporterr = processor.exporttosqlite(outputpath, exportopts)
case "excel":
outputpath := filepath.join(settings.exportpath, "export.xlsx")
exporterr = processor.exporttoexcel(outputpath, exportopts)
}
if exporterr != nil {
dialog.showerror(exporterr, window)
} else {
dialog.showinformation("成功", "数据导出完成", window)
settings.updateexportinfo(format, settings.exportpath, time.now().format("2006-01-02"))
config.savesettings(settings)
}
oncomplete()
}()
}
// 添加更新处理函数
func startupdate(window fyne.window, oncomplete func()) {
settings, err := config.loadsettings()
if err != nil {
dialog.showerror(err, window)
oncomplete()
return
}
if settings.tdxpath == "" {
dialog.showerror(fmt.errorf("请先在参数设置中设置通达信数据路径"), window)
oncomplete()
return
}
go func() {
processor := tdx.newdataprocessor(settings.tdxpath)
progress := dialog.newprogress("更新数据", "正在更新...", window)
progress.show()
progresscallback := func(stockcode string, current, total int) {
progress.setvalue(float64(current) / float64(total))
}
if err := processor.updatedata(progresscallback); err != nil {
progress.hide()
dialog.showerror(err, window)
} else {
progress.hide()
dialog.showinformation("成功", "数据更新完成", window)
}
oncomplete()
}()
}
// 辅助函数:检查字符串是否在切片中
func contains(slice []string, str string) bool {
for _, v := range slice {
if v == str {
return true
}
}
return false
}go.mod
module tdx_exporter go 1.23.1 require ( fyne.io/fyne/v2 v2.5.2 github.com/lib/pq v1.10.9 github.com/mattn/go-sqlite3 v1.14.24 github.com/xuri/excelize/v2 v2.9.0 ) require ( fyne.io/systray v1.11.0 // indirect github.com/burntsushi/toml v1.4.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/fredbi/uri v1.1.0 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/fyne-io/gl-js v0.0.0-20220119005834-d2da28d9ccfe // indirect github.com/fyne-io/glfw-js v0.0.0-20240101223322-6e1efdc71b7a // indirect github.com/fyne-io/image v0.0.0-20220602074514-4956b0afb3d2 // indirect github.com/go-gl/gl v0.0.0-20211210172815-726fda9656d6 // indirect github.com/go-gl/glfw/v3.3/glfw v0.0.0-20240506104042-037f3cc74f2a // indirect github.com/go-text/render v0.2.0 // indirect github.com/go-text/typesetting v0.2.0 // indirect github.com/godbus/dbus/v5 v5.1.0 // indirect github.com/gopherjs/gopherjs v1.17.2 // indirect github.com/jeandeaual/go-locale v0.0.0-20240223122105-ce5225dcaa49 // indirect github.com/jsummers/gobmp v0.0.0-20151104160322-e2ba15ffa76e // indirect github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 // indirect github.com/nicksnyder/go-i18n/v2 v2.4.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/richardlehane/mscfb v1.0.4 // indirect github.com/richardlehane/msoleps v1.0.4 // indirect github.com/rymdport/portal v0.2.6 // indirect github.com/srwiley/oksvg v0.0.0-20221011165216-be6e8873101c // indirect github.com/srwiley/rasterx v0.0.0-20220730225603-2ab79fcdd4ef // indirect github.com/stretchr/testify v1.8.4 // indirect github.com/xuri/efp v0.0.0-20240408161823-9ad904a10d6d // indirect github.com/xuri/nfp v0.0.0-20240318013403-ab9948c2c4a7 // indirect github.com/yuin/goldmark v1.7.1 // indirect golang.org/x/crypto v0.28.0 // indirect golang.org/x/image v0.18.0 // indirect golang.org/x/mobile v0.0.0-20231127183840-76ac6878050a // indirect golang.org/x/net v0.30.0 // indirect golang.org/x/sys v0.26.0 // indirect golang.org/x/text v0.19.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect )
data_processor.go
package tdx
import (
"bytes"
"database/sql"
"encoding/binary"
"encoding/csv"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strconv"
"strings"
_ "github.com/mattn/go-sqlite3"
"github.com/xuri/excelize/v2"
)
type dbconfig struct {
host string
port int
user string
password string
dbname string
}
type daydata struct {
date string `json:"date"`
open float64 `json:"open"`
high float64 `json:"high"`
low float64 `json:"low"`
close float64 `json:"close"`
amount float64 `json:"amount"`
volume int64 `json:"volume"`
}
type tdxdata struct {
date string
open float64
high float64
low float64
close float64
volume int64
amount float64
}
type dataprocessor struct {
datapath string
}
type exportoptions struct {
lastexporttime string
isincremental bool
targetdir string
datatypes datatypes
logcallback logcallback
}
type logcallback func(format string, args ...interface{})
// 添加进度回调函数类型
type progresscallback func(stockcode string, current, total int)
// 添加数据类型结构
type datatypes struct {
day bool
min5 bool
min1 bool
}
// 添加数据结构定义
type tdxminrecord struct {
date uint16 // 日期,2字节
minute uint16 // 分钟,2字节
open float32 // 开盘价,4字节
high float32 // 最高价,4字节
low float32 // 最低价,4字节
close float32 // 收盘价,4字节
amount float32 // 20-23字节:成交额(元),single float
volume uint32 // 24-27字节:成交量(股),ulong
reserved uint32 // 28-31字节:保留
}
// 修改记录结构定义,分开日线和分钟线
type tdxdayrecord struct {
date uint32 // 日期,4字节,格式: yyyymmdd
open uint32 // 开盘价,4字节
high uint32 // 最高价,4字节
low uint32 // 最低价,4字节
close uint32 // 收盘价,4字节
amount float32 // 成交额,4字节
volume uint32 // 成交量,4字节
reserved uint32 // 保留,4字节
}
func newdataprocessor(path string) *dataprocessor {
return &dataprocessor{
datapath: path,
}
}
func (dp *dataprocessor) exporttocsv(outputpath string, opts exportoptions) error {
// 使用传入的输出路径作为基础目录
opts.targetdir = outputpath
return dp.transformdata(opts)
}
func (dp *dataprocessor) exporttosqlite(dbpath string, opts exportoptions) error {
log := opts.logcallback
if log == nil {
log = func(format string, args ...interface{}) {
fmt.printf(format+"\n", args...)
}
}
log("创建sqlite数据库...")
db, err := sql.open("sqlite3", dbpath)
if err != nil {
return fmt.errorf("创建sqlite数据库失败: %v", err)
}
defer db.close()
log("创建数据表...")
if err := dp.createtables(db); err != nil {
return fmt.errorf("创建表失败: %v", err)
}
// 处理不同周期的数据
if opts.datatypes.day {
log("正在导出日线数据到sqlite...")
if err := dp.exportdaydatatosqlite(db, log); err != nil {
return fmt.errorf("导出日线数据失败: %v", err)
}
}
if opts.datatypes.min5 {
log("正在导出5分钟数据到sqlite...")
if err := dp.exportmindatatosqlite(db, "5min", "fivemin", log); err != nil {
return fmt.errorf("导出5分钟数据失败: %v", err)
}
}
if opts.datatypes.min1 {
log("正在导出1分钟数据到sqlite...")
if err := dp.exportmindatatosqlite(db, "1min", "onemin", log); err != nil {
return fmt.errorf("导出1分钟数据失败: %v", err)
}
}
log("数据导出完成")
return nil
}
func (dp *dataprocessor) createtables(db *sql.db) error {
// 日线数据表
_, err := db.exec(`
create table if not exists stock_day_data (
代码 text,
日期 text,
开盘价 real,
最高价 real,
最低价 real,
收盘价 real,
成交额 real,
成交量 integer,
primary key (代码, 日期)
)
`)
if err != nil {
return err
}
// 5分钟数据表
_, err = db.exec(`
create table if not exists stock_5min_data (
代码 text,
日期 text,
时间 text,
开盘价 real,
最高价 real,
最低价 real,
收盘价 real,
成交额 real,
成交量 integer,
primary key (代码, 日期, 时间)
)
`)
if err != nil {
return err
}
// 1分钟数据表
_, err = db.exec(`
create table if not exists stock_1min_data (
代码 text,
日期 text,
时间 text,
开盘价 real,
最高价 real,
最低价 real,
收盘价 real,
成交额 real,
成交量 integer,
primary key (代码, 日期, 时间)
)
`)
return err
}
func (dp *dataprocessor) exportmindatatosqlite(db *sql.db, period string, dirname string, log logcallback) error {
csvdir := filepath.join(os.getenv("home"), "tdx_export", dirname)
files, err := os.readdir(csvdir)
if err != nil {
return fmt.errorf("读取csv目录失败: %v", err)
}
// 开始事务
tx, err := db.begin()
if err != nil {
return fmt.errorf("开始事务失败: %v", err)
}
// 准备插入语句
tablename := fmt.sprintf("stock_%s_data", period)
stmt, err := tx.prepare(fmt.sprintf(`
insert or replace into %s (
代码, 日期, 时间, 开盘价, 最高价, 最低价, 收盘价, 成交额, 成交量
) values (?, ?, ?, ?, ?, ?, ?, ?, ?)
`, tablename))
if err != nil {
tx.rollback()
return fmt.errorf("准备sql语句失败: %v", err)
}
defer stmt.close()
// 处理每个csv文件
filecount := 0
for _, file := range files {
if !file.isdir() && strings.hassuffix(strings.tolower(file.name()), ".csv") {
stockcode := strings.trimsuffix(file.name(), ".csv")
if stockcode == "all_codes" {
continue
}
filecount++
log("正在处理%s数据,股票代码:%s (%d/%d)", period, stockcode, filecount, len(files)-1)
// 读取csv文件
csvpath := filepath.join(csvdir, file.name())
csvfile, err := os.open(csvpath)
if err != nil {
tx.rollback()
return fmt.errorf("打开csv文件失败 %s: %v", file.name(), err)
}
reader := csv.newreader(csvfile)
// 跳过标题行
reader.read()
// 读取数据
recordcount := 0
for {
record, err := reader.read()
if err == io.eof {
break
}
if err != nil {
csvfile.close()
tx.rollback()
return fmt.errorf("读取csv记录失败: %v", err)
}
// 转换数据类型
open, _ := strconv.parsefloat(record[2], 64)
high, _ := strconv.parsefloat(record[3], 64)
low, _ := strconv.parsefloat(record[4], 64)
close, _ := strconv.parsefloat(record[5], 64)
amount, _ := strconv.parsefloat(record[6], 64)
volume, _ := strconv.parseint(record[7], 10, 64)
recordcount++
// 插入数据
_, err = stmt.exec(
stockcode,
record[0], // 日期
record[1], // 时间
open,
high,
low,
close,
amount,
volume,
)
if err != nil {
csvfile.close()
tx.rollback()
return fmt.errorf("插入数据失败: %v", err)
}
}
log("完成处理 %s,共导入 %d 条记录", stockcode, recordcount)
csvfile.close()
}
}
// 提交事务
if err := tx.commit(); err != nil {
return fmt.errorf("提交事务失败: %v", err)
}
log("完成导出%s数据,共处理 %d 个文件", period, filecount)
return nil
}
func (dp *dataprocessor) exportdaydatatosqlite(db *sql.db, log logcallback) error {
csvdir := filepath.join(os.getenv("home"), "tdx_export", "day")
files, err := os.readdir(csvdir)
if err != nil {
log("读取csv目录失败: %v", err)
return fmt.errorf("读取csv目录失败: %v", err)
}
log("开始导出日线数据到sqlite...")
tx, err := db.begin()
if err != nil {
return fmt.errorf("开始事务失败: %v", err)
}
stmt, err := tx.prepare(`
insert or replace into stock_day_data (
代码, 日期, 开盘价, 最高价, 最低价, 收盘价, 成交额, 成交量
) values (?, ?, ?, ?, ?, ?, ?, ?)
`)
if err != nil {
tx.rollback()
return fmt.errorf("准备sql语句失败: %v", err)
}
defer stmt.close()
for _, file := range files {
if !file.isdir() && strings.hassuffix(strings.tolower(file.name()), ".csv") {
stockcode := strings.trimsuffix(file.name(), ".csv")
if stockcode == "all_codes" {
continue
}
log("正在处理股票: %s", stockcode)
csvfile, err := os.open(filepath.join(csvdir, file.name()))
if err != nil {
tx.rollback()
return fmt.errorf("打开csv文件失败 %s: %v", file.name(), err)
}
reader := csv.newreader(csvfile)
reader.read() // 跳过标题行
for {
record, err := reader.read()
if err == io.eof {
break
}
if err != nil {
csvfile.close()
tx.rollback()
return fmt.errorf("读取csv记录失败: %v", err)
}
open, _ := strconv.parsefloat(record[1], 64)
high, _ := strconv.parsefloat(record[2], 64)
low, _ := strconv.parsefloat(record[3], 64)
close, _ := strconv.parsefloat(record[4], 64)
amount, _ := strconv.parsefloat(record[5], 64)
amount /= 100
volume, _ := strconv.parseint(record[6], 10, 64)
volume /= 100
_, err = stmt.exec(
stockcode,
record[0], // 日期
open, high, low, close,
amount, volume,
)
if err != nil {
csvfile.close()
tx.rollback()
return fmt.errorf("插入数据失败: %v", err)
}
}
csvfile.close()
}
}
log("日线数据导出完成")
return tx.commit()
}
func (dp *dataprocessor) readtdxdata() ([]tdxdata, error) {
if dp.datapath == "" {
return nil, errors.new("通达信数据路径未设置")
}
// todo: 实现通达信数据获取
return nil, nil
}
func (dp *dataprocessor) transformdata(opts exportoptions) error {
log := opts.logcallback
if log == nil {
log = func(format string, args ...interface{}) {
fmt.printf(format+"\n", args...)
}
}
// 使用传入的输出路径
basedir := opts.targetdir
if basedir == "" {
basedir = filepath.join(os.getenv("home"), "tdx_export")
}
// 创建不同时间周期的目录
targetdirs := map[string]string{
"day": filepath.join(basedir, "day"),
"min1": filepath.join(basedir, "min1"),
"min5": filepath.join(basedir, "min5"),
}
// 根据选择的数据类型过滤源
var selectedsources []struct {
path string
interval string
}
// 根据用户选择添加数据源
if opts.datatypes.day {
selectedsources = append(selectedsources,
struct{ path, interval string }{filepath.join(dp.datapath, "vipdoc", "sz", "lday"), "day"},
struct{ path, interval string }{filepath.join(dp.datapath, "vipdoc", "sh", "lday"), "day"},
)
}
if opts.datatypes.min1 {
selectedsources = append(selectedsources,
struct{ path, interval string }{filepath.join(dp.datapath, "vipdoc", "sz", "minline"), "min1"},
struct{ path, interval string }{filepath.join(dp.datapath, "vipdoc", "sh", "minline"), "min1"},
)
}
if opts.datatypes.min5 {
selectedsources = append(selectedsources,
struct{ path, interval string }{filepath.join(dp.datapath, "vipdoc", "sz", "fzline"), "min5"},
struct{ path, interval string }{filepath.join(dp.datapath, "vipdoc", "sh", "fzline"), "min5"},
)
}
// 确保目标目录存在
for _, dir := range targetdirs {
if err := os.mkdirall(dir, 0755); err != nil {
return fmt.errorf("创建目标目录失败: %v", err)
}
}
// 处理选中的数据源
for _, source := range selectedsources {
files, err := os.readdir(source.path)
if err != nil {
continue
}
for _, file := range files {
if !file.isdir() {
var fileext string
switch source.interval {
case "day":
fileext = ".day"
case "min1":
fileext = ".lc1"
case "min5":
fileext = ".lc5"
}
if strings.hassuffix(strings.tolower(file.name()), fileext) {
if err := dp.converttocsv(source.path, file.name(), targetdirs[source.interval], source.interval); err != nil {
continue
}
}
}
}
}
return nil
}
func (dp *dataprocessor) converttocsv(sourcepath, filename, targetdir string, datatype string) error {
// 读取源文件
sourcefile, err := os.readfile(filepath.join(sourcepath, filename))
if err != nil {
return err
}
// 创建目标文件
targetfile, err := os.create(filepath.join(targetdir, strings.trimsuffix(filename, filepath.ext(filename))+".csv"))
if err != nil {
return err
}
defer targetfile.close()
// 写入csv头
var header string
switch datatype {
case "day":
header = "日期,开盘价,最高价,最低价,收盘价,成交额,成交量\n"
case "min1", "min5":
header = "日期,时间,开盘价,最高价,最低价,收盘价,成交额,成交量\n"
}
// 写入 utf-8 bom,确保 excel 正确识别中文
if _, err := targetfile.write([]byte{0xef, 0xbb, 0xbf}); err != nil {
return fmt.errorf("写入 bom 失败: %v", err)
}
if _, err := targetfile.writestring(header); err != nil {
return fmt.errorf("写入csv头失败: %v", err)
}
// 处理记录
recordsize := 32
recordcount := len(sourcefile) / recordsize
for i := 0; i < recordcount; i++ {
offset := i * recordsize
var line string
switch datatype {
case "day":
var record tdxdayrecord
binary.read(bytes.newreader(sourcefile[offset:offset+recordsize]), binary.littleendian, &record)
line = dp.formatdayrecord(record)
case "min1", "min5":
var record tdxminrecord
binary.read(bytes.newreader(sourcefile[offset:offset+recordsize]), binary.littleendian, &record)
line = dp.formatminrecord(record)
}
targetfile.writestring(line)
}
return nil
}
// updatedata 增量更新数据
func (dp *dataprocessor) updatedata(progress progresscallback) error {
// 取csv文件目录
csvdir := filepath.join(os.getenv("home"), "tdx_export", "day")
// 读取所有股票列表
codes, err := dp.readallcodes(filepath.join(csvdir, "all_codes.csv"))
if err != nil {
return fmt.errorf("读取代码列表失败: %v", err)
}
for i, code := range codes {
if progress != nil {
progress(code, i+1, len(codes))
}
// 取现有csv文件的最后一个日期
csvpath := filepath.join(csvdir, code+".csv")
lastdate, err := dp.getlastdate(csvpath)
if err != nil {
return fmt.errorf("读取文件 %s 失败: %v", code, err)
}
// 确定数据文件路径
market := "sz"
if strings.hasprefix(code, "6") || strings.hasprefix(code, "5") {
market = "sh"
}
daypath := filepath.join(dp.datapath, "vipdoc", market, "lday", code+".day")
// 增量更新数据
if err := dp.appendnewdata(daypath, csvpath, lastdate); err != nil {
return fmt.errorf("更新文件 %s 失败: %v", code, err)
}
}
return nil
}
// readallcodes 读取代码列表文件
func (dp *dataprocessor) readallcodes(filepath string) ([]string, error) {
file, err := os.open(filepath)
if err != nil {
return nil, err
}
defer file.close()
reader := csv.newreader(file)
records, err := reader.readall()
if err != nil {
return nil, err
}
var codes []string
for i, record := range records {
if i == 0 { // 跳过标题行
continue
}
codes = append(codes, record[0])
}
return codes, nil
}
// getlastdate 获取csv文件中最后一个日期
func (dp *dataprocessor) getlastdate(filepath string) (string, error) {
file, err := os.open(filepath)
if err != nil {
return "", err
}
defer file.close()
reader := csv.newreader(file)
var lastdate string
for {
record, err := reader.read()
if err == io.eof {
break
}
if err != nil {
return "", err
}
if len(record) > 0 {
lastdate = record[0] // 第一是日期
}
}
return lastdate, nil
}
// appendnewdata 追加新数据到csv文件
func (dp *dataprocessor) appendnewdata(daypath, csvpath, lastdate string) error {
// 读取day文件
dayfile, err := os.readfile(daypath)
if err != nil {
return err
}
// 打开csv文件用于追加
csvfile, err := os.openfile(csvpath, os.o_append|os.o_wronly, 0644)
if err != nil {
return err
}
defer csvfile.close()
// 处理每条记录
recordsize := 32
recordcount := len(dayfile) / recordsize
for i := 0; i < recordcount; i++ {
offset := i * recordsize
var record tdxminrecord
err := binary.read(strings.newreader(string(dayfile[offset:offset+recordsize])), binary.littleendian, &record)
if err != nil {
return err
}
// 转换日期 - 使用 yyyymmdd 格式
year := record.date / 10000
month := (record.date % 10000) / 100
day := record.date % 100
date := fmt.sprintf("%d-%02d-%02d", year, month, day)
// 只追加新数据
if date <= lastdate {
continue
}
// 写入新数据
line := fmt.sprintf("%s,%.2f,%.2f,%.2f,%.2f,%.2f,%d\n",
date,
float64(record.open)/100.0,
float64(record.high)/100.0,
float64(record.low)/100.0,
float64(record.close)/100.0,
float64(record.amount)/100.0,
record.volume)
if _, err := csvfile.writestring(line); err != nil {
return err
}
}
return nil
}
func (dp *dataprocessor) formatdayrecord(record tdxdayrecord) string {
// format date: yyyymmdd
date := fmt.sprintf("%d-%02d-%02d",
record.date/10000,
(record.date%10000)/100,
record.date%100)
// day prices need to be divided by 100 to get the actual value
return fmt.sprintf("%s,%.2f,%.2f,%.2f,%.2f,%.2f,%d\n",
date,
float64(record.open)/100.0,
float64(record.high)/100.0,
float64(record.low)/100.0,
float64(record.close)/100.0,
float64(record.amount)/100.0, // amount is already in correct format
int(record.volume)/100)
}
func (dp *dataprocessor) formatminrecord(record tdxminrecord) string {
// 解析日期
year := 2004 + (record.date / 2048)
month := (record.date % 2048) / 100
day := record.date % 2048 % 100
date := fmt.sprintf("%d-%02d-%02d", year, month, day)
// 解析时间
hour := record.minute / 60
minute := record.minute % 60
time := fmt.sprintf("%02d:%02d", hour, minute)
// 格式化输出,将日期和时间分为两个字段
return fmt.sprintf("%s,%s,%.2f,%.2f,%.2f,%.2f,%.2f,%d\n",
date, // 日期字段
time, // 时间字段
record.open, // 开盘价
record.high, // 最高价
record.low, // 最低价
record.close, // 收盘价
float64(record.amount)/100, // 成交额
record.volume/100)
}
func (dp *dataprocessor) exporttoexcel(outputpath string, opts exportoptions) error {
// 创建 excel 主目录
exceldir := filepath.join(outputpath, "excel")
// 创建不同时间周期的目录
exceldirs := map[string]string{
"day": filepath.join(exceldir, "day"),
"min1": filepath.join(exceldir, "min1"),
"min5": filepath.join(exceldir, "min5"),
}
// 确保目标目录存在
for _, dir := range exceldirs {
if err := os.mkdirall(dir, 0755); err != nil {
return fmt.errorf("创建excel目录失败: %v", err)
}
}
// 先导出到csv
if err := dp.transformdata(opts); err != nil {
return fmt.errorf("转换数据失败: %v", err)
}
// 处理不同周期的数据
if opts.datatypes.day {
if err := dp.exportdaydatatoexcel(exceldirs["day"]); err != nil {
return fmt.errorf("导出日线数据失败: %v", err)
}
}
if opts.datatypes.min5 {
if err := dp.exportmindatatoexcel(exceldirs["min5"], "fivemin", "5分钟"); err != nil {
return fmt.errorf("导出5分钟数据失败: %v", err)
}
}
if opts.datatypes.min1 {
if err := dp.exportmindatatoexcel(exceldirs["min1"], "onemin", "1分钟"); err != nil {
return fmt.errorf("导出1分钟数据失败: %v", err)
}
}
return nil
}
func (dp *dataprocessor) exportmindatatoexcel(outputpath, dirname, sheetprefix string) error {
csvdir := filepath.join(os.getenv("home"), "tdx_export", dirname)
files, err := os.readdir(csvdir)
if err != nil {
return fmt.errorf("读取csv目录失败: %v", err)
}
filecount := 0
for _, file := range files {
if !file.isdir() && strings.hassuffix(strings.tolower(file.name()), ".csv") {
stockcode := strings.trimsuffix(file.name(), ".csv")
if stockcode == "all_codes" {
continue
}
filecount++
fmt.printf("正在处理%s数据,股票代码:%s (%d/%d)\n", sheetprefix, stockcode, filecount, len(files)-1)
// 创建新的excel文件
f := excelize.newfile()
defer f.close()
// 设置默认sheet名称
sheetname := fmt.sprintf("%s_%s", stockcode, sheetprefix)
index, err := f.newsheet(sheetname)
if err != nil {
return fmt.errorf("创建sheet失败: %v", err)
}
f.deletesheet("sheet1")
f.setactivesheet(index)
// 写入表头
headers := []string{"日期", "时间", "开盘价", "最高价", "最低价", "收盘价", "成交额", "成交量"}
for i, header := range headers {
cell := fmt.sprintf("%c1", 'a'+i)
f.setcellvalue(sheetname, cell, header)
}
// 读取csv数据
csvpath := filepath.join(csvdir, file.name())
csvfile, err := os.open(csvpath)
if err != nil {
return fmt.errorf("打开csv文件失败 %s: %v", file.name(), err)
}
reader := csv.newreader(csvfile)
reader.read() // 跳过标题行
row := 2 // 从第2行开始写入数据
for {
record, err := reader.read()
if err == io.eof {
break
}
if err != nil {
csvfile.close()
return fmt.errorf("读取csv记录失败: %v", err)
}
// 写入数据行
for i, value := range record {
cell := fmt.sprintf("%c%d", 'a'+i, row)
f.setcellvalue(sheetname, cell, value)
}
row++
}
csvfile.close()
// 保存excel文件
excelpath := filepath.join(outputpath, fmt.sprintf("%s.xlsx", stockcode))
if err := f.saveas(excelpath); err != nil {
return fmt.errorf("保存excel文件失败: %v", err)
}
}
}
fmt.printf("完成导出%s数据,共处理 %d 个文件\n", sheetprefix, filecount)
return nil
}
func (dp *dataprocessor) exportdaydatatoexcel(outputpath string) error {
csvdir := filepath.join(os.getenv("home"), "tdx_export", "day")
files, err := os.readdir(csvdir)
if err != nil {
return fmt.errorf("读取csv目录失败: %v", err)
}
filecount := 0
for _, file := range files {
if !file.isdir() && strings.hassuffix(strings.tolower(file.name()), ".csv") {
stockcode := strings.trimsuffix(file.name(), ".csv")
if stockcode == "all_codes" {
continue
}
filecount++
fmt.printf("正在处理日线数据,股票代码:%s (%d/%d)\n", stockcode, filecount, len(files)-1)
// 创建新的excel文件
f := excelize.newfile()
defer f.close()
// 设置sheet名称
sheetname := fmt.sprintf("%s_日线", stockcode)
index, err := f.newsheet(sheetname)
if err != nil {
return fmt.errorf("创建sheet失败: %v", err)
}
f.deletesheet("sheet1")
f.setactivesheet(index)
// 写入表头
headers := []string{"日期", "开盘价", "最高价", "最低价", "收盘价", "成交额", "成交量"}
for i, header := range headers {
cell := fmt.sprintf("%c1", 'a'+i)
f.setcellvalue(sheetname, cell, header)
}
// 读取csv数据
csvpath := filepath.join(csvdir, file.name())
csvfile, err := os.open(csvpath)
if err != nil {
return fmt.errorf("打开csv文件失败 %s: %v", file.name(), err)
}
reader := csv.newreader(csvfile)
reader.read() // 跳过标题行
row := 2 // 从第2行开始写入数据
for {
record, err := reader.read()
if err == io.eof {
break
}
if err != nil {
csvfile.close()
return fmt.errorf("读取csv记录��败: %v", err)
}
// 写入数据行
for i, value := range record {
cell := fmt.sprintf("%c%d", 'a'+i, row)
f.setcellvalue(sheetname, cell, value)
}
row++
}
csvfile.close()
// 保存excel文件
excelpath := filepath.join(outputpath, fmt.sprintf("%s.xlsx", stockcode))
if err := f.saveas(excelpath); err != nil {
return fmt.errorf("保存excel文件失败: %v", err)
}
}
}
fmt.printf("完成导出日线数据,共处理 %d 个文件\n", filecount)
return nil
}
func (dp *dataprocessor) exporttopostgres(dbconfig dbconfig, opts exportoptions) error {
connstr := fmt.sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable",
dbconfig.host, dbconfig.port, dbconfig.user, dbconfig.password, dbconfig.dbname)
db, err := sql.open("postgres", connstr)
if err != nil {
return fmt.errorf("连接数据库失败: %v", err)
}
defer db.close()
// 测试连接
if err := db.ping(); err != nil {
return fmt.errorf("数据库连接测试失败: %v", err)
}
// 根据选择创建对应的表
if err := dp.createselectedtables(db, opts.datatypes); err != nil {
return fmt.errorf("创建表失败: %v", err)
}
// 先导出到csv
if err := dp.transformdata(opts); err != nil {
return fmt.errorf("转换数据失败: %v", err)
}
// 根据选择导入数据
if opts.datatypes.day {
fmt.println("开始导入日线数据...")
if err := dp.exportdaydatatopostgres(db); err != nil {
return fmt.errorf("导出日线数据失败: %v", err)
}
}
if opts.datatypes.min5 {
fmt.println("开始导入5分钟线数据...")
if err := dp.exportmindatatopostgres(db, "5min", "fivemin"); err != nil {
return fmt.errorf("导出5分钟数据失败: %v", err)
}
}
if opts.datatypes.min1 {
fmt.println("开始导入1分钟线数据...")
if err := dp.exportmindatatopostgres(db, "1min", "onemin"); err != nil {
return fmt.errorf("导出1分钟数据失败: %v", err)
}
}
return nil
}
// 只创建选中的表
func (dp *dataprocessor) createselectedtables(db *sql.db, types datatypes) error {
if types.day {
if err := dp.createdaytable(db); err != nil {
return err
}
}
if types.min1 {
if err := dp.createmintable(db, "1min"); err != nil {
return err
}
}
if types.min5 {
if err := dp.createmintable(db, "5min"); err != nil {
return err
}
}
return nil
}
func (dp *dataprocessor) createdaytable(db *sql.db) error {
// 日线数据表
_, err := db.exec(`
create table if not exists stock_day_data (
代码 text,
日期 date,
开盘价 numeric(10,2),
最高价 numeric(10,2),
最低价 numeric(10,2),
收盘价 numeric(10,2),
成交额 numeric(16,2),
成交量 bigint,
constraint stock_day_data_key unique (代码, 日期)
)
`)
if err != nil {
return err
}
return nil
}
func (dp *dataprocessor) createmintable(db *sql.db, period string) error {
// 分钟线数据表
_, err := db.exec(fmt.sprintf(`
create table if not exists stock_%s_data (
代码 text,
日期 date,
时间 time,
开盘价 numeric(10,2),
最高价 numeric(10,2),
最低价 numeric(10,2),
收盘价 numeric(10,2),
成交额 numeric(16,2),
成交量 bigint,
constraint stock_%s_data_key unique (代码, 日期, 时间)
)
`, period, period))
if err != nil {
return err
}
return nil
}
func (dp *dataprocessor) exportdaydatatopostgres(db *sql.db) error {
stmt, err := db.prepare(`
insert into stock_day_data (代码, 日期, 开盘价, 最高价, 最低价, 收盘价, 成交额, 成交量)
values ($1, $2, $3, $4, $5, $6, $7, $8)
on conflict (代码, 日期) do update set
开盘价 = excluded.开盘价,
最高价 = excluded.最高价,
最低价 = excluded.最低价,
收盘价 = excluded.收盘价,
成交额 = excluded.成交额,
成交量 = excluded.成交量
`)
if err != nil {
return err
}
defer stmt.close()
// 读取csv文件并导入数据
csvdir := filepath.join(os.getenv("home"), "tdx_export", "day")
return dp.importcsvtopostgres(csvdir, stmt, false)
}
func (dp *dataprocessor) exportmindatatopostgres(db *sql.db, period string, dirname string) error {
stmt, err := db.prepare(fmt.sprintf(`
insert into stock_%s_data (代码, 日期, 时间, 开盘价, 最高价, 最低价, 收盘价, 成交额, 成交量)
values ($1, $2, $3, $4, $5, $6, $7, $8, $9)
on conflict (代码, 日期, 时间) do update set
开盘价 = excluded.开盘价,
最高价 = excluded.最高价,
最低价 = excluded.最低价,
收���价 = excluded.收盘价,
成交额 = excluded.成交额,
成交量 = excluded.成交量
`, period))
if err != nil {
return err
}
defer stmt.close()
// 读取csv文件并导入数据
csvdir := filepath.join(os.getenv("home"), "tdx_export", dirname)
return dp.importcsvtopostgres(csvdir, stmt, true)
}
func (dp *dataprocessor) importcsvtopostgres(csvdir string, stmt *sql.stmt, hastime bool) error {
files, err := os.readdir(csvdir)
if err != nil {
return err
}
// 计算总文件数(排除 all_codes.csv)
totalfiles := 0
for _, file := range files {
if !file.isdir() && strings.hassuffix(file.name(), ".csv") && file.name() != "all_codes.csv" {
totalfiles++
}
}
fmt.printf("开始导入数据,共 %d 个文件需要处理\n", totalfiles)
processedfiles := 0
for _, file := range files {
if !file.isdir() && strings.hassuffix(file.name(), ".csv") {
stockcode := strings.trimsuffix(file.name(), ".csv")
if stockcode == "all_codes" {
continue
}
processedfiles++
fmt.printf("正在处理 [%d/%d] %s\n", processedfiles, totalfiles, stockcode)
csvfile, err := os.open(filepath.join(csvdir, file.name()))
if err != nil {
return err
}
reader := csv.newreader(csvfile)
reader.read() // 跳过标题行
recordcount := 0
for {
record, err := reader.read()
if err == io.eof {
break
}
if err != nil {
csvfile.close()
return err
}
// 转换数据类型
values := make([]interface{}, 0)
values = append(values, stockcode, record[0])
if hastime {
values = append(values, record[1])
record = record[2:]
}
for _, v := range record[1:] {
val, _ := strconv.parsefloat(v, 64)
values = append(values, val)
}
if _, err := stmt.exec(values...); err != nil {
csvfile.close()
return err
}
recordcount++
}
csvfile.close()
fmt.printf("完成处理 %s,导入 %d 条记录\n", stockcode, recordcount)
}
}
fmt.printf("数据导入完成,共处理 %d 个文件\n", processedfiles)
return nil
}settings.go
package config
import (
"encoding/json"
"os"
"path/filepath"
)
type exportinfo struct {
lastpath string `json:"last_path"` // 上次导出路径
lasttime string `json:"last_time"` // 上次导出时间
}
// 添加数据库连接配置结构
type dbconfig struct {
host string `json:"host"`
port int `json:"port"`
user string `json:"user"`
password string `json:"password"`
dbname string `json:"dbname"`
}
type settings struct {
tdxpath string `json:"tdx_path"` // 通达信数据路径
exportpath string `json:"export_path"` // 导出数据保存路径
exportpaths map[string]exportinfo `json:"export_paths"` // 不同格式的导出信息
dbconfig dbconfig `json:"db_config"` // 数据库连接配置
}
func newsettings() *settings {
// 默认导出到用户目录下的 tdx_export
homedir, _ := os.userhomedir()
return &settings{
exportpath: filepath.join(homedir, "tdx_export"),
exportpaths: make(map[string]exportinfo),
dbconfig: dbconfig{
host: "localhost",
port: 5432,
user: "postgres",
dbname: "tdx_data",
},
}
}
// updateexportinfo 更新导出信息
func (s *settings) updateexportinfo(format, path string, exporttime string) {
s.exportpaths[format] = exportinfo{
lastpath: path,
lasttime: exporttime,
}
}
// getlastexportinfo 获取上次导出信息
func (s *settings) getlastexportinfo(format string) (exportinfo, bool) {
info, exists := s.exportpaths[format]
return info, exists
}
func getconfigpath() string {
// 获取当前工作目录
currentdir, err := os.getwd()
if err != nil {
return ""
}
// 创建配置目录
configdir := filepath.join(currentdir, "config")
if err := os.mkdirall(configdir, 0755); err != nil {
return ""
}
// 返回配置文件完整路径
return filepath.join(configdir, "settings.json")
}
func savesettings(settings *settings) error {
configpath := getconfigpath()
// 格式化 json 以便于阅读和编辑
data, err := json.marshalindent(settings, "", " ")
if err != nil {
return err
}
return os.writefile(configpath, data, 0644)
}
func loadsettings() (*settings, error) {
configpath := getconfigpath()
// 如果配置文件不存在,创建默认配置
if _, err := os.stat(configpath); os.isnotexist(err) {
settings := newsettings()
if err := savesettings(settings); err != nil {
return nil, err
}
return settings, nil
}
data, err := os.readfile(configpath)
if err != nil {
return newsettings(), nil
}
var settings settings
if err := json.unmarshal(data, &settings); err != nil {
return newsettings(), nil
}
// 确保 exportpaths 已初始化
if settings.exportpaths == nil {
settings.exportpaths = make(map[string]exportinfo)
}
return &settings, nil
} settings.json
{
"tdx_path": "/users/apple/downloads/tdx",
"export_path": "/users/apple/downloads/tdx/exportdata",
"export_paths": {
"csv": {
"last_path": "/users/apple/downloads/tdx/exportdata",
"last_time": "2024-10-08"
},
"sqlite": {
"last_path": "/users/apple/downloads/tdx/exportdata",
"last_time": "2024-10-08"
}
},
"db_config": {
"host": "127.0.0.1",
"port": 5432,
"user": "postgres",
"password": "postgres",
"dbname": "stock"
}
}到此这篇关于基于go中fyne gui的通达信数据导出工具的文章就介绍到这了,更多相关go 通达信数据导出工具内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论