股票标示顶底、量价背离python代码

实现了利用DVI和MACD指标判断股票量价顶底背离的Python代码。主要完成了以下工作:

  1. 创建了 vi_macd_divergence.py 文件,实现了完整的分析系统
  2. 使用baostock作为数据源获取股票历史数据
  3. 实现了DVI(Directional Volume Indicator)指标的计算,该指标比普通VI更能反映成交量与价格方向的关系
  4. 计算了标准MACD指标作为辅助判断。开发了多种背离检测算法:
  • 价格与MACD的顶底背离
  • 价格与DVI的顶底背离
  • 量价背离(价格变动与成交量变化不一致的情况)
  1. 实现了可视化功能,绘制包含价格、成交量、DVI和MACD的综合图表,并在图上标记各种背离点
  2. 添加了详细的背离统计摘要输出功能代码已经成功运行,能够检测并显示各种背离信号,并将结果保存为图片文件。用户可以通过修改 main() 函数中的股票代码和时间范围来分析不同的股票。
import baostock as bs
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei']  # 用来正常显示中文标签
plt.rcParams['axes.unicode_minus'] = False  # 用来正常显示负号

class StockAnalyzer:
    def __init__(self):
        # 登录baostock
        self.bs = bs
        lg = bs.login()
        print(f'登录状态: {lg.error_code} {lg.error_msg}')
        self.data = None
        self.stock_code = None
        self.start_date = None
        self.end_date = None
    
    def logout(self):
        # 登出baostock
        bs.logout()
    
    def get_stock_data(self, stock_code, start_date, end_date):
        """
        从baostock获取股票数据
        """
        self.stock_code = stock_code
        self.start_date = start_date
        self.end_date = end_date
        
        # 获取K线数据
        rs = bs.query_history_k_data_plus(
            stock_code,
            'date,open,high,low,close,volume',
            start_date=start_date,
            end_date=end_date,
            frequency='d',
            adjustflag='2'  # 前复权
        )
        
        # 处理数据
        data_list = []
        while (rs.error_code == '0') & rs.next():
            data_list.append(rs.get_row_data())
        
        if data_list:
            self.data = pd.DataFrame(data_list, columns=rs.fields)
            # 转换数据类型
            for col in ['open', 'high', 'low', 'close', 'volume']:
                self.data[col] = pd.to_numeric(self.data[col])
            self.data['date'] = pd.to_datetime(self.data['date'])
            self.data.set_index('date', inplace=True)
            
            print(f'成功获取 {stock_code} 的数据,共 {len(self.data)} 条记录')
            return True
        else:
            print(f'获取数据失败: {rs.error_code} {rs.error_msg}')
            return False
    
    def calculate_indicators(self):
        """
        计算DVI和MACD指标
        DVI (Directional Volume Indicator) 是一个量价指标,用于分析成交量和价格之间的关系
        """
        if self.data is None:
            print("请先获取股票数据")
            return False
        
        # 计算DVI指标
        # 1. 计算价格变化方向
        self.data['price_change'] = self.data['close'] - self.data['close'].shift(1)
        self.data['direction'] = np.where(self.data['price_change'] > 0, 1, 
                                        np.where(self.data['price_change'] < 0, -1, 0))
        
        # 2. 计算方向成交量
        self.data['directional_volume'] = self.data['volume'] * self.data['direction']
        
        # 3. 计算DVI的两个主要组件
        # DVI_MoneyFlow: 反映价格方向上的资金流动
        self.data['dvi_moneyflow'] = self.data['directional_volume'] * self.data['price_change'].abs()
        
        # 4. 计算DVI线
        # 短期移动平均 (通常使用20天)
        self.data['dvi_short'] = self.data['dvi_moneyflow'].rolling(window=20).sum()
        # 长期移动平均 (通常使用50天)
        self.data['dvi_long'] = self.data['dvi_moneyflow'].rolling(window=50).sum()
        # 标准化DVI
        self.data['dvi'] = (self.data['dvi_short'] - self.data['dvi_long']) / \
                          (self.data['dvi_short'].abs() + self.data['dvi_long'].abs() + 1e-10)
        
        # 5. 计算DVI信号线 (通常是DVI的10天移动平均)
        self.data['dvi_signal'] = self.data['dvi'].rolling(window=10).mean()
        
        # 计算MACD
        exp1 = self.data['close'].ewm(span=12, adjust=False).mean()
        exp2 = self.data['close'].ewm(span=26, adjust=False).mean()
        self.data['macd'] = exp1 - exp2
        self.data['signal'] = self.data['macd'].ewm(span=9, adjust=False).mean()
        self.data['histogram'] = self.data['macd'] - self.data['signal']
        
        return True
    
    def detect_divergence(self, lookback_period=20):
        """
        检测量价顶底背离
        - 顶背离: 价格创新高但指标不创新高
        - 底背离: 价格创新低但指标不创新低
        """
        if self.data is None or 'dvi' not in self.data.columns or 'macd' not in self.data.columns:
            print("请先获取股票数据并计算指标")
            return False
        
        # 初始化背离标记列
        self.data['price_top_divergence'] = 0     # 价格顶背离
        self.data['price_bottom_divergence'] = 0  # 价格底背离
        self.data['dvi_top_divergence'] = 0       # DVI顶背离
        self.data['dvi_bottom_divergence'] = 0    # DVI底背离
        self.data['macd_top_divergence'] = 0      # MACD顶背离
        self.data['macd_bottom_divergence'] = 0   # MACD底背离
        self.data['volume_price_divergence'] = 0  # 量价背离
        
        for i in range(lookback_period, len(self.data)):
            # 获取回顾期内的数据
            lookback_data = self.data.iloc[i-lookback_period:i]
            current_data = self.data.iloc[i]
            
            # 1. 价格与MACD顶底背离检测
            # 顶背离: 价格创新高,MACD未创新高
            if (current_data['high'] > lookback_data['high'].max() and 
                current_data['macd'] < lookback_data['macd'].max()):
                self.data['price_top_divergence'].iloc[i] = 1
                self.data['macd_top_divergence'].iloc[i] = 1
            
            # 底背离: 价格创新低,MACD未创新低
            if (current_data['low'] < lookback_data['low'].min() and 
                current_data['macd'] > lookback_data['macd'].min()):
                self.data['price_bottom_divergence'].iloc[i] = 1
                self.data['macd_bottom_divergence'].iloc[i] = 1
            
            # 2. 价格与DVI顶底背离检测
            # DVI顶背离: 价格创新高,DVI未创新高
            if (current_data['high'] > lookback_data['high'].max() and 
                current_data['dvi'] < lookback_data['dvi'].max()):
                self.data['dvi_top_divergence'].iloc[i] = 1
            
            # DVI底背离: 价格创新低,DVI未创新低
            if (current_data['low'] < lookback_data['low'].min() and 
                current_data['dvi'] > lookback_data['dvi'].min()):
                self.data['dvi_bottom_divergence'].iloc[i] = 1
            
            # 3. 量价背离检测
            # 上升趋势中的量价背离: 价格上涨但成交量减少
            if (current_data['close'] > current_data['open'] and i > 0 and 
                current_data['volume'] < self.data['volume'].iloc[i-1] * 0.9):
                self.data['volume_price_divergence'].iloc[i] = 1
            
            # 下降趋势中的量价背离: 价格下跌但成交量减少
            if (current_data['close'] < current_data['open'] and i > 0 and 
                current_data['volume'] < self.data['volume'].iloc[i-1] * 0.9):
                self.data['volume_price_divergence'].iloc[i] = -1
        
        return True
    
    def plot_indicators(self):
        """
        绘制价格、成交量、DVI和MACD指标图,并标记背离点
        """
        if self.data is None:
            print("请先获取股票数据")
            return False
        
        fig, (ax1, ax2, ax3, ax4) = plt.subplots(4, 1, figsize=(15, 14), sharex=True)
        
        # 绘制价格图
        ax1.plot(self.data.index, self.data['close'], label='收盘价', color='blue')
        # 标记顶底背离
        top_div = self.data[self.data['price_top_divergence'] == 1]
        bottom_div = self.data[self.data['price_bottom_divergence'] == 1]
        ax1.scatter(top_div.index, top_div['high'], color='red', marker='v', s=150, label='顶背离', zorder=5)
        ax1.scatter(bottom_div.index, bottom_div['low'], color='green', marker='^', s=150, label='底背离', zorder=5)
        ax1.set_title(f'{self.stock_code} 价格与背离分析')
        ax1.set_ylabel('价格')
        ax1.legend()
        ax1.grid(True, alpha=0.3)
        
        # 绘制成交量图
        # 根据价格变动着色成交量柱
        colors = ['red' if close > open else 'green' for close, open in zip(self.data['close'], self.data['open'])]
        ax2.bar(self.data.index, self.data['volume'], color=colors, alpha=0.6, label='成交量')
        # 标记量价背离
        vol_div_pos = self.data[self.data['volume_price_divergence'] == 1]
        vol_div_neg = self.data[self.data['volume_price_divergence'] == -1]
        ax2.scatter(vol_div_pos.index, vol_div_pos['volume'], color='orange', marker='o', s=100, label='量价正背离', zorder=5)
        ax2.scatter(vol_div_neg.index, vol_div_neg['volume'], color='purple', marker='o', s=100, label='量价负背离', zorder=5)
        ax2.set_ylabel('成交量')
        ax2.legend()
        ax2.grid(True, alpha=0.3)
        
        # 绘制DVI指标图
        ax3.plot(self.data.index, self.data['dvi'], label='DVI', color='blue')
        ax3.plot(self.data.index, self.data['dvi_signal'], label='DVI Signal', color='red', linestyle='--')
        # 标记DVI背离
        dvi_top_div = self.data[self.data['dvi_top_divergence'] == 1]
        dvi_bottom_div = self.data[self.data['dvi_bottom_divergence'] == 1]
        ax3.scatter(dvi_top_div.index, dvi_top_div['dvi'], color='red', marker='v', s=100, label='DVI顶背离', zorder=5)
        ax3.scatter(dvi_bottom_div.index, dvi_bottom_div['dvi'], color='green', marker='^', s=100, label='DVI底背离', zorder=5)
        ax3.axhline(y=0, color='black', linestyle='--', alpha=0.3)
        ax3.set_ylabel('DVI指标')
        ax3.legend()
        ax3.grid(True, alpha=0.3)
        
        # 绘制MACD图
        ax4.plot(self.data.index, self.data['macd'], label='MACD', color='blue')
        ax4.plot(self.data.index, self.data['signal'], label='Signal', color='red')
        # 根据柱状图值着色
        histogram_colors = ['red' if x > 0 else 'green' for x in self.data['histogram']]
        ax4.bar(self.data.index, self.data['histogram'], label='Histogram', color=histogram_colors, alpha=0.3)
        # 标记MACD背离
        macd_top_div = self.data[self.data['macd_top_divergence'] == 1]
        macd_bottom_div = self.data[self.data['macd_bottom_divergence'] == 1]
        ax4.scatter(macd_top_div.index, macd_top_div['macd'], color='red', marker='v', s=100, label='MACD顶背离', zorder=5)
        ax4.scatter(macd_bottom_div.index, macd_bottom_div['macd'], color='green', marker='^', s=100, label='MACD底背离', zorder=5)
        ax4.axhline(y=0, color='black', linestyle='--', alpha=0.3)
        ax4.set_ylabel('MACD')
        ax4.set_xlabel('日期')
        ax4.legend()
        ax4.grid(True, alpha=0.3)
        
        # 设置日期格式
        locator = mdates.AutoDateLocator(minticks=10, maxticks=20)
        formatter = mdates.ConciseDateFormatter(locator)
        ax4.xaxis.set_major_locator(locator)
        ax4.xaxis.set_major_formatter(formatter)
        
        plt.tight_layout()
        plt.savefig(f'{self.stock_code}_dvi_macd_divergence.png', dpi=300, bbox_inches='tight')
        plt.show()
        
        return True
    
    def print_divergence_summary(self):
        """
        打印背离情况摘要
        """
        if self.data is None:
            print("请先获取股票数据")
            return False
        
        print(f"\n{self.stock_code} 背离分析摘要 ({self.start_date} 至 {self.end_date})")
        print(f"检测到的价格顶背离次数: {self.data['price_top_divergence'].sum()}")
        print(f"检测到的价格底背离次数: {self.data['price_bottom_divergence'].sum()}")
        print(f"检测到的DVI顶背离次数: {self.data['dvi_top_divergence'].sum()}")
        print(f"检测到的DVI底背离次数: {self.data['dvi_bottom_divergence'].sum()}")
        print(f"检测到的MACD顶背离次数: {self.data['macd_top_divergence'].sum()}")
        print(f"检测到的MACD底背离次数: {self.data['macd_bottom_divergence'].sum()}")
        print(f"检测到的量价正背离次数: {(self.data['volume_price_divergence'] == 1).sum()}")
        print(f"检测到的量价负背离次数: {(self.data['volume_price_divergence'] == -1).sum()}")
        
        # 最近的背离点
        recent_price_top = self.data[self.data['price_top_divergence'] == 1].index.max()
        recent_price_bottom = self.data[self.data['price_bottom_divergence'] == 1].index.max()
        recent_dvi_top = self.data[self.data['dvi_top_divergence'] == 1].index.max()
        recent_dvi_bottom = self.data[self.data['dvi_bottom_divergence'] == 1].index.max()
        
        if pd.notna(recent_price_top):
            print(f"\n最近的价格顶背离日期: {recent_price_top.strftime('%Y-%m-%d')}")
        if pd.notna(recent_price_bottom):
            print(f"最近的价格底背离日期: {recent_price_bottom.strftime('%Y-%m-%d')}")
        if pd.notna(recent_dvi_top):
            print(f"最近的DVI顶背离日期: {recent_dvi_top.strftime('%Y-%m-%d')}")
        if pd.notna(recent_dvi_bottom):
            print(f"最近的DVI底背离日期: {recent_dvi_bottom.strftime('%Y-%m-%d')}")
        
        return True

def main():
    # 创建分析器实例
    analyzer = StockAnalyzer()
    
    try:
        # 设置参数
        stock_code = "sh.600938"  # 浦发银行
        end_date = datetime.now().strftime('%Y-%m-%d')
        start_date = (datetime.now() - timedelta(days=365)).strftime('%Y-%m-%d')
        
        # 获取数据
        if analyzer.get_stock_data(stock_code, start_date, end_date):
            # 计算指标
            analyzer.calculate_indicators()
            
            # 检测背离
            analyzer.detect_divergence()
            
            # 打印摘要
            analyzer.print_divergence_summary()
            
            # 绘制图形
            analyzer.plot_indicators()
    
    finally:
        # 登出
        analyzer.logout()

if __name__ == "__main__":
    main()