跳轉到

Pipeline 因子架構 - Code 模板

本頁提供可直接使用的 Code Template,並標註需要自定義的部分。


📋 模板總覽

我們將因子挖掘流程拆解為五個模塊,請依照你的需求組裝:

模塊 (Module) 核心功能 你的任務 (Action)
M1. 基礎建設 環境設定 股票池 🟢 設定參數 (回測區間、數據 Bundle)
M2. 因子定義 CustomFactor 運算邏輯 🔥 定義 Alpha (撰寫數學公式、資料清洗)
M3. 訊號組裝 Pipeline Filter 🔥 合成因子 (多因子結合、篩選、排名)
M4. 調倉邏輯 Schedule Rebalance 🔵 選擇頻率 (月換/季換、權重分配)
M5. 執行引擎 run_algorithm 績效回測 🔒 直接執行 (分析 IC 值、報酬率)

🎯 完整模板

Module 1: 環境設定 & 股票池

# ====================================
# Module 1: 環境設定 & 股票池
# ====================================

import os
import pandas as pd
import numpy as np
import tejapi
import matplotlib.pyplot as plt
from logbook import Logger

# TEJ API 設定
os.environ['TEJAPI_BASE'] = 'https://api.tej.com.tw'
os.environ['TEJAPI_KEY'] = 'your_key'
tejapi.ApiConfig.api_key = os.getenv('TEJAPI_KEY')
tejapi.ApiConfig.api_base = os.getenv('TEJAPI_BASE')

# 日誌設定
log = Logger('Strategy')

# 中文顯示設定
plt.rcParams['font.sans-serif'] = ['Microsoft JhengHei']
plt.rcParams['axes.unicode_minus'] = False

# ====================================
# 參數設定
# ====================================
# 🔧 自定義:回測期間
start_date = '2019-01-01'
end_date = '2023-12-31'

# 🔧 自定義:股票池
# 方法 1: 台灣 50 成分股
tw50_list = get_universe(start_date,end_date, idx_id='IX0002')
print(f"台灣 50 成分股數量: {len(tw50_list)}")

# 方法 2: 上市 + 上櫃
from zipline.sources.TEJ_Api_Data import get_universe
pool = get_universe(
    start=pd.Timestamp(start_date, tz='utc'),
    end=pd.Timestamp(end_date, tz='utc'),
    mkt_bd_e=['TSE', 'OTC'],
    stktp_e='Common Stock'
)

# 設定環境變數
os.environ['mdate'] = f'{start_date} {end_date}'
os.environ['ticker'] = ' '.join(pool)  # 或 tw50_list

# ====================================
# 匯入股價資料
# ====================================
from zipline.data.run_ingest import simple_ingest

pools = pool + ['IR0001']  # 加入基準指數

print("正在準備 Zipline 資料...")
simple_ingest(
    name='tquant',
    tickers=pools,
    start_date=start_date.replace('-', ''),
    end_date=end_date.replace('-', '')
)
print("資料準備完成!")

Module 2: CustomFactor 定義 🔥

這是 最核心 的模塊,定義你的因子計算邏輯。

📌 模板 A:單一資料源因子

# ====================================
# Module 2A: 單一資料源因子
# ====================================

from zipline.pipeline import CustomFactor
from zipline.pipeline.data import EquityPricing
import numpy as np

class Momentum(CustomFactor):
    """
    動量因子:過去 N 天報酬率

    公式:(最新價 - 起始價) / 起始價
    """
    # 🔧 自定義:需要多少天的資料
    window_length = 252  # 年化報酬(252 個交易日)

    # 🔧 自定義:需要哪些欄位
    inputs = [EquityPricing.close]

    def compute(self, today, assets, out, close):
        """
        計算邏輯

        Parameters:
        -----------
        today : pd.Timestamp
            當前日期
        assets : np.array
            股票代碼陣列
        out : np.array
            輸出陣列(要填入計算結果)
        close : np.array
            收盤價矩陣(window_length × 股票數)
        """
        # 🔥 自定義區:替換成你的計算邏輯

        # 計算報酬率
        returns = (close[-1] - close[0]) / close[0]

        # 填入輸出陣列
        out[:] = returns


class Volatility(CustomFactor):
    """
    波動率因子:過去 N 天報酬率的標準差
    """
    window_length = 252
    inputs = [EquityPricing.close]

    def compute(self, today, assets, out, close):
        # 🔥 自定義區

        # 計算日報酬率
        daily_returns = np.diff(close, axis=0) / close[:-1]

        # 計算標準差(年化)
        volatility = np.nanstd(daily_returns, axis=0) * np.sqrt(252)

        out[:] = volatility


class AverageDollarVolume(CustomFactor):
    """
    平均成交金額:過去 N 天平均成交金額
    """
    window_length = 30
    inputs = [EquityPricing.close, EquityPricing.volume]

    def compute(self, today, assets, out, close, volume):
        # 🔥 自定義區

        # 成交金額 = 收盤價 × 成交量
        dollar_volume = close * volume

        # 計算平均
        avg_dollar_volume = np.nanmean(dollar_volume, axis=0)

        out[:] = avg_dollar_volume

📌 模板 B:多資料源因子

# ====================================
# Module 2B: 多資料源因子
# ====================================

class PriceToVolume(CustomFactor):
    """
    價量比:收盤價 / 成交量
    """
    window_length = 1  # 只需要最新一天
    inputs = [EquityPricing.close, EquityPricing.volume]

    def compute(self, today, assets, out, close, volume):
        # 🔥 自定義區

        # 取最新值
        latest_close = close[-1]
        latest_volume = volume[-1]

        # 計算比值(避免除以零)
        ratio = latest_close / (latest_volume + 1e-10)

        out[:] = ratio


class RSI(CustomFactor):
    """
    RSI 指標:相對強弱指數
    """
    window_length = 15  # 14 天 RSI 需要 15 天資料
    inputs = [EquityPricing.close]

    def compute(self, today, assets, out, close):
        # 🔥 自定義區

        # 計算價格變化
        delta = np.diff(close, axis=0)

        # 分離漲跌
        gains = np.where(delta > 0, delta, 0)
        losses = np.where(delta < 0, -delta, 0)

        # 計算平均漲跌幅
        avg_gain = np.nanmean(gains, axis=0)
        avg_loss = np.nanmean(losses, axis=0)

        # 計算 RS 和 RSI
        rs = avg_gain / (avg_loss + 1e-10)
        rsi = 100 - (100 / (1 + rs))

        out[:] = rsi

📌 模板 C:排名因子

# ====================================
# Module 2C: 排名因子
# ====================================

class MomentumRank(CustomFactor):
    """
    動量排名:將動量轉換為排名(0-1)
    """
    window_length = 252
    inputs = [EquityPricing.close]

    def compute(self, today, assets, out, close):
        # 🔥 自定義區

        # 計算報酬率
        returns = (close[-1] - close[0]) / close[0]

        # 轉換為排名(0-1 之間)
        # 最高報酬 = 1,最低報酬 = 0
        from scipy.stats import rankdata
        ranks = rankdata(returns) / len(returns)

        out[:] = ranks

📌 模板 D:組合因子

# ====================================
# Module 2D: 組合因子
# ====================================

class QualityScore(CustomFactor):
    """
    品質分數:ROE + 毛利率的組合

    需要外部數據(TEJ)
    """
    window_length = 1

    def compute(self, today, assets, out, roe, gross_margin):
        # 🔥 自定義區

        # 標準化 ROE(0-1)
        roe_normalized = (roe[-1] - np.nanmin(roe[-1])) / (np.nanmax(roe[-1]) - np.nanmin(roe[-1]) + 1e-10)

        # 標準化毛利率(0-1)
        gm_normalized = (gross_margin[-1] - np.nanmin(gross_margin[-1])) / (np.nanmax(gross_margin[-1]) - np.nanmin(gross_margin[-1]) + 1e-10)

        # 加權組合(可調整權重)
        quality_score = 0.6 * roe_normalized + 0.4 * gm_normalized

        out[:] = quality_score

Module 3: Pipeline 組合 🔥

# ====================================
# Module 3: Pipeline 組合
# ====================================

from zipline.pipeline import Pipeline
from zipline.pipeline.filters import StaticAssets

def make_pipeline():
    """
    建立 Pipeline

    流程:
    1. 定義因子
    2. 定義篩選器(screen)
    3. 組合 Pipeline
    """
    # ========================================
    # Step 1: 定義因子
    # ========================================
    # 🔥 自定義區:選擇你要使用的因子

    momentum = Momentum()
    volatility = Volatility()
    dollar_volume = AverageDollarVolume()

    # ========================================
    # Step 2: 定義篩選器
    # ========================================
    # 🔥 自定義區:設定篩選條件

    # 篩選 1: 成交金額前 N 名(流動性過濾)
    tradable = dollar_volume.top(100)

    # 篩選 2: 動量 > 0(正報酬)
    positive_momentum = momentum > 0

    # 篩選 3: 波動率 < 0.5(不要太波動)
    low_volatility = volatility < 0.5

    # 組合篩選器(AND)
    screen = tradable & positive_momentum & low_volatility

    # 或使用特定股票池
    # universe = StaticAssets(symbols(['2330', '2317', '2454']))
    # screen = universe

    # ========================================
    # Step 3: 組合 Pipeline
    # ========================================
    pipe = Pipeline(
        columns={
            'momentum': momentum,
            'volatility': volatility,
            'dollar_volume': dollar_volume
        },
        screen=screen
    )

    return pipe

Module 4: 調倉邏輯

# ====================================
# Module 4: 調倉邏輯
# ====================================

from zipline.api import (
    attach_pipeline, pipeline_output,
    order_target_percent, set_commission, set_slippage,
    record, schedule_function, date_rules, time_rules
)
from zipline.finance import commission, slippage

def initialize(context):
    """
    初始化函數
    """
    # 交易成本設定
    set_commission(commission.PerShare(cost=0.001425, min_trade_cost=20))
    set_slippage(slippage.VolumeShareSlippage(volume_limit=0.025, price_impact=0.1))

    # 附加 Pipeline
    attach_pipeline(make_pipeline(), 'my_pipeline')

    # 🔧 自定義:調倉頻率
    # 每週一開盤後調倉
    schedule_function(
        rebalance,
        date_rules.week_start(),
        time_rules.market_open()
    )

    # 或每月第一個交易日
    # schedule_function(
    #     rebalance,
    #     date_rules.month_start(),
    #     time_rules.market_open()
    # )

    # 或每日
    # schedule_function(
    #     rebalance,
    #     date_rules.every_day(),
    #     time_rules.market_open()
    # )


def before_trading_start(context, data):
    """
    盤前執行:取得 Pipeline 輸出
    """
    # 取得 Pipeline 結果
    output = pipeline_output('my_pipeline')

    # 🔥 自定義區:選股邏輯

    # 方法 1: 取前 N 名
    context.stocks = output.nlargest(20, 'momentum').index.tolist()

    # 方法 2: 綜合排名
    # output['rank'] = output['momentum'].rank() + output['volatility'].rank(ascending=False)
    # context.stocks = output.nlargest(20, 'rank').index.tolist()

    # 方法 3: 分組
    # high_momentum = output[output['momentum'] > output['momentum'].median()]
    # context.stocks = high_momentum.index.tolist()

    # 儲存完整 output(用於計算權重)
    context.output = output


def rebalance(context, data):
    """
    調倉函數
    """
    # ========================================
    # Step 1: 計算目標權重
    # ========================================
    # 🔥 自定義區:權重計算方法

    # 方法 1: 等權重
    target_weights = {}
    for stock in context.stocks:
        target_weights[stock] = 1.0 / len(context.stocks)

    # 方法 2: 因子加權(動量越高,權重越大)
    # momentum_values = context.output.loc[context.stocks, 'momentum']
    # total_momentum = momentum_values.sum()
    # target_weights = {}
    # for stock in context.stocks:
    #     target_weights[stock] = momentum_values[stock] / total_momentum

    # 方法 3: 反波動率加權
    # volatility_values = context.output.loc[context.stocks, 'volatility']
    # inv_vol = 1 / volatility_values
    # total_inv_vol = inv_vol.sum()
    # target_weights = {}
    # for stock in context.stocks:
    #     target_weights[stock] = inv_vol[stock] / total_inv_vol

    # ========================================
    # Step 2: 賣出不在清單的股票
    # ========================================
    for stock in context.portfolio.positions:
        if stock not in context.stocks:
            order_target_percent(stock, 0)

    # ========================================
    # Step 3: 買入目標股票
    # ========================================
    for stock, weight in target_weights.items():
        if data.can_trade(stock):
            order_target_percent(stock, weight)

    # ========================================
    # Step 4: 記錄資訊
    # ========================================
    record(
        num_positions=len(context.portfolio.positions),
        leverage=context.account.leverage
    )

Module 5: 績效分析 & 執行回測

# ====================================
# Module 5: 績效分析函數
# ====================================

def analyze(context, perf):
    """
    績效分析與視覺化
    """
    import matplotlib.pyplot as plt

    fig = plt.figure(figsize=(16, 12))

    # ========================================
    # 圖 1: 投資組合價值
    # ========================================
    ax1 = fig.add_subplot(411)
    perf['portfolio_value'].plot(ax=ax1, linewidth=2)
    ax1.set_ylabel('Portfolio Value (TWD)', fontsize=12)
    ax1.set_title('Portfolio Performance', fontsize=14, fontweight='bold')
    ax1.grid(True, alpha=0.3)

    # ========================================
    # 圖 2: 累積報酬 vs 基準
    # ========================================
    ax2 = fig.add_subplot(412)

    cumulative_returns = (1 + perf['returns']).cumprod() - 1
    benchmark_returns = (1 + perf['benchmark_return']).cumprod() - 1

    cumulative_returns.plot(ax=ax2, label='Strategy', linewidth=2, color='#2E86AB')
    benchmark_returns.plot(ax=ax2, label='Benchmark', linewidth=2, alpha=0.7, color='#A23B72')

    ax2.set_ylabel('Cumulative Returns', fontsize=12)
    ax2.set_title('Strategy vs Benchmark', fontsize=14, fontweight='bold')
    ax2.legend(loc='upper left', fontsize=11)
    ax2.grid(True, alpha=0.3)
    ax2.axhline(0, color='black', linewidth=0.8, linestyle='--', alpha=0.5)

    # ========================================
    # 圖 3: 持倉數量
    # ========================================
    ax3 = fig.add_subplot(413)
    perf['num_positions'].plot(ax=ax3, linewidth=2, color='#F18F01')
    ax3.set_ylabel('Number of Positions', fontsize=12)
    ax3.set_title('Position Count', fontsize=14, fontweight='bold')
    ax3.grid(True, alpha=0.3)

    # ========================================
    # 圖 4: 槓桿
    # ========================================
    ax4 = fig.add_subplot(414)
    perf['leverage'].plot(ax=ax4, linewidth=2, color='#6A4C93')
    ax4.set_ylabel('Leverage', fontsize=12)
    ax4.set_xlabel('Date', fontsize=12)
    ax4.set_title('Portfolio Leverage', fontsize=14, fontweight='bold')
    ax4.grid(True, alpha=0.3)
    ax4.axhline(1, color='red', linewidth=1, linestyle='--', alpha=0.5)

    plt.tight_layout()
    plt.show()

    # 儲存績效數據
    perf.to_csv(f'pipeline_performance.csv')
    print(f"\n績效數據已儲存至: pipeline_performance.csv")


# ====================================
# 執行回測
# ====================================
from zipline import run_algorithm

print("="*60)
print("開始回測 Pipeline 因子策略")
print("="*60)

results = run_algorithm(
    start=pd.Timestamp(start_date, tz='utc'),
    end=pd.Timestamp(end_date, tz='utc'),
    initialize=initialize,
    before_trading_start=before_trading_start,
    analyze=analyze,
    bundle='tquant',
    capital_base=1e7  # 1000 萬元
)

print("\n回測完成!")

# ====================================
# 績效統計
# ====================================
print("\n========== 績效摘要 ==========")

total_return = (results['portfolio_value'].iloc[-1] / 1e7 - 1) * 100
benchmark_return = results['benchmark_period_return'].iloc[-1] * 100

print(f"策略總報酬: {total_return:.2f}%")
print(f"基準報酬: {benchmark_return:.2f}%")
print(f"超額報酬: {(total_return - benchmark_return):.2f}%")
print(f"最大回撤: {results['max_drawdown'].min() * 100:.2f}%")
print(f"夏普比率: {results['sharpe'].iloc[-1]:.2f}")
print(f"平均持倉數: {results['num_positions'].mean():.0f}")

# ====================================
# Pyfolio 績效分析
# ====================================
try:
    import pyfolio as pf
    from pyfolio.utils import extract_rets_pos_txn_from_zipline

    print("\n" + "="*60)
    print("Pyfolio 績效分析")
    print("="*60)

    returns, positions, transactions = extract_rets_pos_txn_from_zipline(results)
    benchmark_rets = results.benchmark_return

    pf.tears.create_full_tear_sheet(
        returns=returns,
        positions=positions,
        transactions=transactions,
        benchmark_rets=benchmark_rets
    )

except ImportError:
    print("\n未安裝 pyfolio,略過詳細分析")
    print("若需完整報告,請執行: pip install pyfolio")
except Exception as e:
    print(f"\nPyfolio 分析錯誤: {e}")

🎯 使用指南

Step 1: 選擇因子類型

根據你的策略選擇對應的 CustomFactor 模板:

  • 模板 A:單一資料源(動量、波動率、成交量)
  • 模板 B:多資料源(價量比、RSI)
  • 模板 C:排名因子(相對排名)
  • 模板 D:組合因子(多因子加權)

Step 2: 實作 compute() 函數

修改 compute() 函數中的計算邏輯:

def compute(self, today, assets, out, close):
    # 1. 提取需要的資料
    latest_close = close[-1]
    first_close = close[0]

    # 2. 計算因子值
    returns = (latest_close - first_close) / first_close

    # 3. 填入輸出陣列
    out[:] = returns

Step 3: 組合 Pipeline

make_pipeline() 中組合因子和篩選器:

def make_pipeline():
    # 定義因子
    factor1 = MyFactor1()
    factor2 = MyFactor2()

    # 定義篩選器
    screen = (factor1 > 0) & (factor2.top(100))

    # 組合 Pipeline
    return Pipeline(
        columns={'factor1': factor1, 'factor2': factor2},
        screen=screen
    )

Step 4: 設定調倉邏輯

選擇調倉頻率和權重計算方法:

# 調倉頻率
schedule_function(
    rebalance,
    date_rules.week_start(),  # 每週一
    time_rules.market_open()
)

# 權重計算
target_weights = {}
for stock in context.stocks:
    target_weights[stock] = 1.0 / len(context.stocks)  # 等權


💡 進階技巧

技巧 1: 多因子組合

def make_pipeline():
    # 定義多個因子
    momentum = Momentum()
    value = ValueFactor()
    quality = QualityFactor()

    # 標準化因子(0-1)
    momentum_rank = momentum.rank(mask=universe) / universe.sum()
    value_rank = value.rank(mask=universe) / universe.sum()
    quality_rank = quality.rank(mask=universe) / universe.sum()

    # 組合因子(加權)
    composite_score = (
        0.4 * momentum_rank +
        0.3 * value_rank +
        0.3 * quality_rank
    )

    return Pipeline(
        columns={'score': composite_score},
        screen=universe
    )

技巧 2: 產業中性

def before_trading_start(context, data):
    output = pipeline_output('my_pipeline')

    # 假設有產業分類欄位
    # 每個產業選前 5 名
    stocks = []
    for industry in output['industry'].unique():
        industry_stocks = output[output['industry'] == industry]
        top5 = industry_stocks.nlargest(5, 'momentum')
        stocks.extend(top5.index.tolist())

    context.stocks = stocks

技巧 3: 動態調整持倉數

def before_trading_start(context, data):
    output = pipeline_output('my_pipeline')

    # 根據市場狀況調整持倉數
    market_volatility = calculate_market_volatility()

    if market_volatility < 0.2:
        num_stocks = 30  # 低波動,持倉多
    elif market_volatility < 0.4:
        num_stocks = 20  # 中波動
    else:
        num_stocks = 10  # 高波動,持倉少

    context.stocks = output.nlargest(num_stocks, 'momentum').index.tolist()

技巧 4: 風險平價

def rebalance(context, data):
    # 根據波動率分配權重
    volatility_values = context.output.loc[context.stocks, 'volatility']

    # 反波動率權重
    inv_vol = 1 / volatility_values
    total_inv_vol = inv_vol.sum()

    target_weights = {}
    for stock in context.stocks:
        target_weights[stock] = inv_vol[stock] / total_inv_vol

    # 執行調倉
    for stock, weight in target_weights.items():
        order_target_percent(stock, weight)

📚 常用因子速查

動量類

# 簡單報酬率
class Returns(CustomFactor):
    window_length = 252
    inputs = [EquityPricing.close]
    def compute(self, today, assets, out, close):
        out[:] = (close[-1] - close[0]) / close[0]

# 年化報酬率
class AnnualizedReturns(CustomFactor):
    window_length = 252
    inputs = [EquityPricing.close]
    def compute(self, today, assets, out, close):
        returns = (close[-1] / close[0]) - 1
        out[:] = returns

波動率類

# 標準差
class StdDev(CustomFactor):
    window_length = 252
    inputs = [EquityPricing.close]
    def compute(self, today, assets, out, close):
        daily_returns = np.diff(close, axis=0) / close[:-1]
        out[:] = np.nanstd(daily_returns, axis=0)

# 年化波動率
class AnnualizedVolatility(CustomFactor):
    window_length = 252
    inputs = [EquityPricing.close]
    def compute(self, today, assets, out, close):
        daily_returns = np.diff(close, axis=0) / close[:-1]
        out[:] = np.nanstd(daily_returns, axis=0) * np.sqrt(252)

成交量類

# 平均成交量
class AverageVolume(CustomFactor):
    window_length = 30
    inputs = [EquityPricing.volume]
    def compute(self, today, assets, out, volume):
        out[:] = np.nanmean(volume, axis=0)

# 成交量變化率
class VolumeChange(CustomFactor):
    window_length = 2
    inputs = [EquityPricing.volume]
    def compute(self, today, assets, out, volume):
        out[:] = (volume[-1] - volume[-2]) / volume[-2]

📚 相關資源


👉 Next Step: 選擇一個模板,參考對應的 case study,開始開發你的 Pipeline 策略!