首页 » 技术分享 » Python批量下载上交所、深交所年报或半年报并生成年报下载器exe文件

Python批量下载上交所、深交所年报或半年报并生成年报下载器exe文件

 

效果预览

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

完整代码

import requests
import csv
import time
import re
import math
import os

START_DATE =input('公布日期from:(xxxx-xx-xx)')
END_DATE=input('公布日期to:(xxxx-xx-xx)')
OUT_DIR =input('目录文件路径:')
OUTPUT_FILENAME = input('目录文件名:')
PLATE =input('板块类型:(沪市:sh;沪市主板:shmb;深市:szse;深主板:szmb;中小板:szzx;创业板:szcy;)')
# 板块类型——沪市:sh;沪市主板:shmb;深市:szse;深主板:szmb;中小板:szzx;创业板:szcy;
CATEGORY = input('公告类型:(category_scgkfx_szsh:首次公开发行及上市、category_ndbg_szsh:年度报告、category_bndbg_szsh:半年度报告')
# 公告类型:category_scgkfx_szsh(首次公开发行及上市)、category_ndbg_szsh(年度报告)、category_bndbg_szsh(半年度报告)
keyword=input('请输入关键字:')

URL = 'http://www.cninfo.com.cn/new/hisAnnouncement/query'
HEADER = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.79 Safari/537.36',
    'X-Requested-With': 'XMLHttpRequest'
}
MAX_PAGESIZE = 30
MAX_RELOAD_TIMES = 5
RESPONSE_TIMEOUT = 10


def standardize_dir(dir_str):
    assert (os.path.exists(dir_str)), 'Such directory \"' + str(dir_str) + '\" does not exists!'
    if dir_str[len(dir_str) - 1] != '/':
        return dir_str + '/'
    else:
        return dir_str



def get_response(page_num, return_total_count=False):
    query={'pageNum': page_num,
         'pageSize': MAX_PAGESIZE,
        'column': 'szse',
        'tabName': 'fulltext',
        'plate': PLATE,
        'stock': '',
        'searchkey': '',
        'category': CATEGORY,
        'seDate': START_DATE + '~' + END_DATE,
         'sortName': '',
         'sortType': '',
         'showTitle': '',
         'limit': '',
    }
    result_list = []
    reloading = 0
    while True:
        try:
            r = requests.post(URL, query, HEADER)
            # print(r.text)
            # print(len(r.text))
        except Exception as e:
            print(e)
            continue
        if r.status_code == requests.codes.ok and r.text != '':
            break
    my_query = r.json()
    try:
        r.close()
    except Exception as e:
        print(e)

    if return_total_count:
        return my_query['totalRecordNum']
    else:
        for each in my_query['announcements']:
            file_link = 'http://static.cninfo.com.cn/' + str(each['adjunctUrl'])
            file_name = __filter_illegal_filename(
                str(each['secCode']) + str(each['secName']) + str(each['announcementTitle']) + '.' + '(' + str(
                    each['adjunctSize']) + 'k)' +
                file_link[-file_link[::-1].find('.') - 1:]  # 最后一项是获取文件类型后缀名
            )
            result_list.append([file_name,file_link])
        return result_list

def __log_error(err_msg):
    err_msg = str(err_msg)
    print(err_msg)
    with open(error_log, 'a', encoding='gb18030') as err_writer:
        err_writer.write(err_msg + '\n')

def __filter_illegal_filename(filename):
    illegal_char = {
        ' ': '',
        '*': '',
        '/': '-',
        '\\': '-',
        ':': '-',
        '?': '-',
        '"': '',
        '<': '',
        '>': '',
        '|': '',
        '-': '-',
        '—': '-',
        '(': '(',
        ')': ')',
        'A': 'A',
        'B': 'B',
        'H': 'H',
        ',': ',',
        '。': '.',
        ':': '-',
        '!': '_',
        '?': '-',
        '“': '"',
        '”': '"',
        '‘': '',
        '’': ''
    }
    for item in illegal_char.items():
        filename = filename.replace(item[0], item[1])
    return filename

if __name__=='__main__':
    out_dir = standardize_dir(OUT_DIR)
    error_log = out_dir + 'error.log'
    output_csv_file = out_dir + OUTPUT_FILENAME.replace('/', '') + '_' + \
                      START_DATE.replace('-', '') + '-' + END_DATE.replace('-', '') + '.csv'

    item_count = get_response(1, True)
    assert (item_count != []), 'Please restart this script!'
    print(item_count)
    begin_pg = 1
    end_pg = int(math.ceil(item_count / MAX_PAGESIZE))
    print('Page count: ' + str(end_pg) + '; item count: ' + str(item_count) + '.')
    time.sleep(2)
    with open(output_csv_file, 'w', newline='', encoding='gb18030') as fa:
        writer = csv.writer(fa)
        for i in range(begin_pg, end_pg + 1):
            #默认为false
            row = get_response(i)
            for r in row:
                if re.search('摘要', r[0], re.S) or (not re.search(keyword, r[0], re.S)) or re.search('取消', r[0], re.S):
                    pass
                else:
                    writer.writerow(r)
                    print(r)

    with open(output_csv_file, mode='r', encoding='gbk', newline='') as f:
        reader = csv.reader(f)
        for row in reader:
            filename = row[0]
            link = row[1]
            r = requests.get(link, stream=True)
            with open(filename, 'wb') as fd:
                for y in r.iter_content(102400):
                    fd.write(y)

以上方法的思路是先获取所有公司名称和链接保存到csv,然后再读取csv下载,也可以每获取一个链接保存一行下载一个年报,代码大同小异:

import requests
import csv
import time
import re
import math
import os

START_DATE =input('公布日期from:(xxxx-xx-xx)')
END_DATE=input('公布日期to:(xxxx-xx-xx)')
OUT_DIR =input('目录文件路径:')
OUTPUT_FILENAME = input('目录文件名:')
PLATE =input('板块类型:(沪市:sh;沪市主板:shmb;深市:szse;深主板:szmb;中小板:szzx;创业板:szcy;)')
# 板块类型——沪市:sh;沪市主板:shmb;深市:szse;深主板:szmb;中小板:szzx;创业板:szcy;
CATEGORY = input('公告类型:(category_scgkfx_szsh:首次公开发行及上市、category_ndbg_szsh:年度报告、category_bndbg_szsh:半年度报告')
# 公告类型:category_scgkfx_szsh(首次公开发行及上市)、category_ndbg_szsh(年度报告)、category_bndbg_szsh(半年度报告)


URL = 'http://www.cninfo.com.cn/new/hisAnnouncement/query'
HEADER = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.79 Safari/537.36',
    'X-Requested-With': 'XMLHttpRequest'
}
MAX_PAGESIZE = 30
MAX_RELOAD_TIMES = 5
RESPONSE_TIMEOUT = 10


def standardize_dir(dir_str):
    assert (os.path.exists(dir_str)), 'Such directory \"' + str(dir_str) + '\" does not exists!'
    if dir_str[len(dir_str) - 1] != '/':
        return dir_str + '/'
    else:
        return dir_str



def get_response(page_num, return_total_count=False):
    query={'pageNum': page_num,
         'pageSize': MAX_PAGESIZE,
        'column': 'szse',
        'tabName': 'fulltext',
        'plate': PLATE,
        'stock': '',
        'searchkey': '',
        'category': CATEGORY,
        'seDate': START_DATE + '~' + END_DATE,
         'sortName': '',
         'sortType': '',
         'showTitle': '',
         'limit': '',
    }
    result_list = []
    reloading = 0
    while True:
        try:
            r = requests.post(URL, query, HEADER)
            # print(r.text)
            # print(len(r.text))
        except Exception as e:
            print(e)
            continue
        if r.status_code == requests.codes.ok and r.text != '':
            break
    my_query = r.json()
    try:
        r.close()
    except Exception as e:
        print(e)

    if return_total_count:
        return my_query['totalRecordNum']
    else:
        for each in my_query['announcements']:
            file_link = 'http://static.cninfo.com.cn/' + str(each['adjunctUrl'])
            file_name = __filter_illegal_filename(
                str(each['secCode']) + str(each['secName']) + str(each['announcementTitle']) + '.' + '(' + str(
                    each['adjunctSize']) + 'k)' +
                file_link[-file_link[::-1].find('.') - 1:]  # 最后一项是获取文件类型后缀名
            )
            result_list.append([file_name,file_link])
        return result_list

def __log_error(err_msg):
    err_msg = str(err_msg)
    print(err_msg)
    with open(error_log, 'a', encoding='gb18030') as err_writer:
        err_writer.write(err_msg + '\n')

def __filter_illegal_filename(filename):
    illegal_char = {
        ' ': '',
        '*': '',
        '/': '-',
        '\\': '-',
        ':': '-',
        '?': '-',
        '"': '',
        '<': '',
        '>': '',
        '|': '',
        '-': '-',
        '—': '-',
        '(': '(',
        ')': ')',
        'A': 'A',
        'B': 'B',
        'H': 'H',
        ',': ',',
        '。': '.',
        ':': '-',
        '!': '_',
        '?': '-',
        '“': '"',
        '”': '"',
        '‘': '',
        '’': ''
    }
    for item in illegal_char.items():
        filename = filename.replace(item[0], item[1])
    return filename

if __name__=='__main__':
    out_dir = standardize_dir(OUT_DIR)
    error_log = out_dir + 'error.log'
    output_csv_file = out_dir + OUTPUT_FILENAME.replace('/', '') + '_' + \
                      START_DATE.replace('-', '') + '-' + END_DATE.replace('-', '') + '.csv'

    item_count = get_response(1, True)
    assert (item_count != []), 'Please restart this script!'
    print(item_count)
    begin_pg = 1
    end_pg = int(math.ceil(item_count / MAX_PAGESIZE))
    print('筛选关键字前,共有 Page count: ' + str(end_pg) + '; item count: ' + str(item_count) + '.')
    time.sleep(2)
    # 设置关键字过滤筛选,比如要下2018年的,可以设置关键字为2018,如果想要下载多年的,可以将关键字的限制范围扩大,比如“年”
    keyword = input('请输入关键字:')
    with open(output_csv_file, 'w', newline='', encoding='gb18030') as fa:
        writer = csv.writer(fa)
        for i in range(begin_pg, end_pg + 1):
            #默认为false
            row = get_response(i)
            for r in row:
                if re.search('摘要', r[0], re.S) or (not re.search(keyword, r[0], re.S)) or re.search('取消', r[0], re.S):
                    pass
                else:
                    writer.writerow(r)
                    print(r)
                    pdf = requests.get(r[1], stream=True)
                    with open(r[0], 'wb') as fd:
                        for y in pdf.iter_content(102400):
                            fd.write(y)

生成年报下载器

将代码打包,具体步骤见上一篇博客:
https://blog.csdn.net/weixin_43636302/article/details/103555585

转载自原文链接, 如需删除请联系管理员。

原文链接:Python批量下载上交所、深交所年报或半年报并生成年报下载器exe文件,转载请注明来源!

0