import pandas as pd
import numpy as np
import scipy.stats
from colour import Color
import numpy as np
import pandas as pd
import plotly as py
import plotly.graph_objs as go
import plotly.offline as offline_py
offline_py.init_notebook_mode(connected=True)
import project_tests
color_scheme = {
'index': '#B6B2CF',
'etf': '#2D3ECF',
'tracking_error': '#6F91DE',
'df_header': 'silver',
'df_value': 'white',
'df_line': 'silver',
'heatmap_colorscale': [(0, '#6F91DE'), (0.5, 'grey'), (1, 'red')],
'background_label': '#9dbdd5',
'low_value': '#B6B2CF',
'high_value': '#2D3ECF',
'y_axis_2_text_color': 'grey',
'shadow': 'rgba(0, 0, 0, 0.75)',
'major_line': '#2D3ECF',
'minor_line': '#B6B2CF',
'main_line': 'black'}
def generate_config():
return {'showLink': False, 'displayModeBar': False, 'showAxisRangeEntryBoxes': True}
def _generate_stock_trace(prices):
return go.Scatter(
name='Index',
x=prices.index,
y=prices,
line={'color': color_scheme['main_line']})
def _generate_buy_annotations(prices, signal):
return [{
'x': index, 'y': price, 'text': 'Long', 'bgcolor': color_scheme['background_label'],
'ayref': 'y', 'ax': 0, 'ay': 20}
for index, price in prices[signal == 1].iteritems()]
def _generate_sell_annotations(prices, signal):
return [{
'x': index, 'y': price, 'text': 'Short', 'bgcolor': color_scheme['background_label'],
'ayref': 'y', 'ax': 0, 'ay': 160}
for index, price in prices[signal == -1].iteritems()]
def _generate_second_tetration_stock(stock_symbol, dates):
"""
Generate stock that follows the second tetration curve
:param stock_symbol: Stock Symbol
:param dates: Dates for ticker
:return: Stock data
"""
n_stock_columns = 5
linear_line = np.linspace(1, 5, len(dates))
all_noise = ((np.random.rand(n_stock_columns, len(dates)) - 0.5) * 0.01)
sector_stock = pd.DataFrame({
'ticker': stock_symbol,
'date': dates,
'base_line': np.power(linear_line, linear_line)})
sector_stock['base_line'] = sector_stock['base_line'] + all_noise[0]*sector_stock['base_line']
sector_stock['adj_open'] = sector_stock['base_line'] + all_noise[1]*sector_stock['base_line']
sector_stock['adj_close'] = sector_stock['base_line'] + all_noise[2]*sector_stock['base_line']
sector_stock['adj_high'] = sector_stock['base_line'] + all_noise[3]*sector_stock['base_line']
sector_stock['adj_low'] = sector_stock['base_line'] + all_noise[4]*sector_stock['base_line']
sector_stock['adj_high'] = sector_stock[['adj_high', 'adj_open', 'adj_close']].max(axis=1)
sector_stock['adj_low'] = sector_stock[['adj_low', 'adj_open', 'adj_close']].min(axis=1)
return sector_stock.drop(columns='base_line')
def generate_tb_sector(dates):
"""
Generate TB sector of stocks
:param dates: Dates that stocks should have market data on
:return: TB sector stocks
"""
symbol_length = 6
stock_names = [
'kaufmanniana', 'clusiana', 'greigii', 'sylvestris', 'turkestanica', 'linifolia', 'gesneriana',
'humilis', 'tarda', 'saxatilis', 'dasystemon', 'orphanidea', 'kolpakowskiana', 'praestans',
'sprengeri', 'bakeri', 'pulchella', 'biflora', 'schrenkii', 'armena', 'vvedenskyi', 'agenensis',
'altaica', 'urumiensis']
return [
_generate_second_tetration_stock(stock_name[:symbol_length].upper(), dates)
for stock_name in stock_names]
def plot_stock(prices, title):
config = generate_config()
layout = go.Layout(title=title)
stock_trace = _generate_stock_trace(prices)
offline_py.iplot({'data': [stock_trace], 'layout': layout}, config=config)
def plot_high_low(prices, lookback_high, lookback_low, title):
config = generate_config()
layout = go.Layout(title=title)
stock_trace = _generate_stock_trace(prices)
high_trace = go.Scatter(
x=lookback_high.index,
y=lookback_high,
name='Column lookback_high',
line={'color': color_scheme['major_line']})
low_trace = go.Scatter(
x=lookback_low.index,
y=lookback_low,
name='Column lookback_low',
line={'color': color_scheme['minor_line']})
offline_py.iplot({'data': [stock_trace, high_trace, low_trace], 'layout': layout}, config=config)
def plot_signal(price, signal, title):
config = generate_config()
buy_annotations = _generate_buy_annotations(price, signal)
sell_annotations = _generate_sell_annotations(price, signal)
layout = go.Layout(
title=title,
annotations=buy_annotations + sell_annotations)
stock_trace = _generate_stock_trace(price)
offline_py.iplot({'data': [stock_trace], 'layout': layout}, config=config)
def plot_lookahead_prices(prices, lookahead_price_list, title):
config = generate_config()
layout = go.Layout(title=title)
colors = Color(color_scheme['low_value'])\
.range_to(Color(color_scheme['high_value']), len(lookahead_price_list))
traces = [_generate_stock_trace(prices)]
for (lookahead_prices, lookahead_days), color in zip(lookahead_price_list, colors):
traces.append(
go.Scatter(
x=lookahead_prices.index,
y=lookahead_prices,
name='{} Day Lookahead'.format(lookahead_days),
line={'color': str(color)}))
offline_py.iplot({'data': traces, 'layout': layout}, config=config)
def plot_price_returns(prices, lookahead_returns_list, title):
config = generate_config()
layout = go.Layout(
title=title,
yaxis2={
'title': 'Returns',
'titlefont': {'color': color_scheme['y_axis_2_text_color']},
'tickfont': {'color': color_scheme['y_axis_2_text_color']},
'overlaying': 'y',
'side': 'right'})
colors = Color(color_scheme['low_value'])\
.range_to(Color(color_scheme['high_value']), len(lookahead_returns_list))
traces = [_generate_stock_trace(prices)]
for (lookahead_returns, lookahead_days), color in zip(lookahead_returns_list, colors):
traces.append(
go.Scatter(
x=lookahead_returns.index,
y=lookahead_returns,
name='{} Day Lookahead'.format(lookahead_days),
line={'color': str(color)},
yaxis='y2'))
offline_py.iplot({'data': traces, 'layout': layout}, config=config)
def plot_signal_returns(prices, signal_return_list, titles):
config = generate_config()
layout = go.Layout(
yaxis2={
'title': 'Signal Returns',
'titlefont': {'color': color_scheme['y_axis_2_text_color']},
'tickfont': {'color': color_scheme['y_axis_2_text_color']},
'overlaying': 'y',
'side': 'right'})
colors = Color(color_scheme['low_value'])\
.range_to(Color(color_scheme['high_value']), len(signal_return_list))
stock_trace = _generate_stock_trace(prices)
for (signal_return, signal, lookahead_days), color, title in zip(signal_return_list, colors, titles):
non_zero_signals = signal_return[signal_return != 0]
signal_return_trace = go.Scatter(
x=non_zero_signals.index,
y=non_zero_signals,
name='{} Day Lookahead'.format(lookahead_days),
line={'color': str(color)},
yaxis='y2')
buy_annotations = _generate_buy_annotations(prices, signal)
sell_annotations = _generate_sell_annotations(prices, signal)
layout['title'] = title
layout['annotations'] = buy_annotations + sell_annotations
offline_py.iplot({'data': [stock_trace, signal_return_trace], 'layout': layout}, config=config)
def plot_signal_histograms(signal_list, title, subplot_titles):
assert len(signal_list) == len(subplot_titles)
signal_series_list = [signal.stack() for signal in signal_list]
all_values = pd.concat(signal_series_list)
x_range = [all_values.min(), all_values.max()]
y_range = [0, 1500]
config = generate_config()
colors = Color(color_scheme['low_value']).range_to(Color(color_scheme['high_value']), len(signal_series_list))
fig = py.tools.make_subplots(rows=1, cols=len(signal_series_list), subplot_titles=subplot_titles, print_grid=False)
fig['layout'].update(title=title, showlegend=False)
for series_i, (signal_series, color) in enumerate(zip(signal_series_list, colors), 1):
filtered_series = signal_series[signal_series != 0].dropna()
trace = go.Histogram(x=filtered_series, marker={'color': str(color)})
fig.append_trace(trace, 1, series_i)
fig['layout']['xaxis{}'.format(series_i)].update(range=x_range)
fig['layout']['yaxis{}'.format(series_i)].update(range=y_range)
offline_py.iplot(fig, config=config)
def plot_signal_to_normal_histograms(signal_list, title, subplot_titles):
assert len(signal_list) == len(subplot_titles)
signal_series_list = [signal.stack() for signal in signal_list]
all_values = pd.concat(signal_series_list)
x_range = [all_values.min(), all_values.max()]
y_range = [0, 1500]
config = generate_config()
fig = py.tools.make_subplots(rows=1, cols=len(signal_series_list), subplot_titles=subplot_titles, print_grid=False)
fig['layout'].update(title=title)
for series_i, signal_series in enumerate(signal_series_list, 1):
filtered_series = signal_series[signal_series != 0].dropna()
filtered_series_trace = go.Histogram(
x=filtered_series,
marker={'color': color_scheme['low_value']},
name='Signal Return Distribution',
showlegend=False)
normal_trace = go.Histogram(
x=np.random.normal(np.mean(filtered_series), np.std(filtered_series), len(filtered_series)),
marker={'color': color_scheme['shadow']},
name='Normal Distribution',
showlegend=False)
fig.append_trace(filtered_series_trace, 1, series_i)
fig.append_trace(normal_trace, 1, series_i)
fig['layout']['xaxis{}'.format(series_i)].update(range=x_range)
fig['layout']['yaxis{}'.format(series_i)].update(range=y_range)
# Show legened
fig['data'][0]['showlegend'] = True
fig['data'][1]['showlegend'] = True
offline_py.iplot(fig, config=config)
df_original = pd.read_csv('./eod-quotemedia.csv', parse_dates=['date'], index_col=False)
# Add TB sector to the market
df = df_original
df = pd.concat([df] + generate_tb_sector(df[df['ticker'] == 'AAPL']['date']), ignore_index=True)
close = df.reset_index().pivot(index='date', columns='ticker', values='adj_close')
high = df.reset_index().pivot(index='date', columns='ticker', values='adj_high')
low = df.reset_index().pivot(index='date', columns='ticker', values='adj_low')
print('Loaded Data')
close
apple_ticker = 'AAPL'
plot_stock(close[apple_ticker], '{} Stock'.format(apple_ticker))
def get_high_lows_lookback(high, low, lookback_days):
"""
Get the highs and lows in a lookback window.
Parameters
----------
high : DataFrame
High price for each ticker and date
low : DataFrame
Low price for each ticker and date
lookback_days : int
The number of days to look back
Returns
-------
lookback_high : DataFrame
Lookback high price for each ticker and date
lookback_low : DataFrame
Lookback low price for each ticker and date
"""
#TODO: Implement function
#往前移动1行是为了不包含当天的值
rolling_max_high=high.shift(1).rolling(lookback_days).max()
rolling_min_low=low.shift(1).rolling(lookback_days).min()
return rolling_max_high, rolling_min_low
project_tests.test_get_high_lows_lookback(get_high_lows_lookback)
lookback_days = 50
lookback_high, lookback_low = get_high_lows_lookback(high, low, lookback_days)
plot_high_low(
close[apple_ticker],
lookback_high[apple_ticker],
lookback_low[apple_ticker],
'High and Low of {} Stock'.format(apple_ticker))
Signal | Condition |
---|---|
-1 | Low > Close Price |
1 | High < Close Price |
0 | Otherwise |
def get_long_short(close, lookback_high, lookback_low):
"""
Generate the signals long, short, and do nothing.
Parameters
----------
close : DataFrame
Close price for each ticker and date
lookback_high : DataFrame
Lookback high price for each ticker and date
lookback_low : DataFrame
Lookback low price for each ticker and date
Returns
-------
long_short : DataFrame
The long, short, and do nothing signals for each ticker and date
"""
#TODO: Implement function
long_signals = (close > lookback_high).astype(np.int)
short_signas = (close < lookback_low).astype(np.int) * -1
long_short = long_signals + short_signas
return long_short
project_tests.test_get_long_short(get_long_short)
signal = get_long_short(close, lookback_high, lookback_low)
plot_signal(
close[apple_ticker],
signal[apple_ticker],
'Long and Short of {} Stock'.format(apple_ticker))
前面的步骤有许多重复的信号!如果我们已经做空一个股票,再有一个做空该股票的信号对策略没有帮助。对于做多也是一样的方法。
实现filter_signals
过滤掉在lookahead_days
天内重复的做多或者做空信号。如果前面的信号是一样的,改为0。例如,假设你有下面的股票时间序列
[1, 0, 1, 0, 1, 0, -1, -1]
运行lookahead_days
参数为3的filter_signals
应该返回下面的序列
[1, 0, 0, 0, 1, 0, -1, 0]
def clear_signals(signals, window_size):
"""
Clear out signals in a Series of just long or short signals.
Remove the number of signals down to 1 within the window size time period.
Parameters
----------
signals : Pandas Series
The long, short, or do nothing signals
window_size : int
The number of days to have a single signal
Returns
-------
signals : Pandas Series
Signals with the signals removed from the window size
"""
# Start with buffer of window size
# This handles the edge case of calculating past_signal in the beginning
clean_signals = [0]*window_size
for signal_i, current_signal in enumerate(signals):
# Check if there was a signal in the past window_size of days
has_past_signal = bool(sum(clean_signals[signal_i:signal_i+window_size]))
# Use the current signal if there's no past signal, else 0/False
clean_signals.append(not has_past_signal and current_signal)
# Remove buffer
clean_signals = clean_signals[window_size:]
# Return the signals as a Series of Ints
return pd.Series(np.array(clean_signals).astype(np.int), signals.index)
def filter_signals(signal, lookahead_days):
"""
Filter out signals in a DataFrame.
Parameters
----------
signal : DataFrame
The long, short, and do nothing signals for each ticker and date
lookahead_days : int
The number of days to look ahead
Returns
-------
filtered_signal : DataFrame
The filtered long, short, and do nothing signals for each ticker and date
"""
#TODO: Implement function
f_signal=signal.copy()
for sector,row in (signal.iteritems()):
s=row.copy()
l=row.copy()
s[s > 0]=0
l[l < 0]=0
f_signal[sector]=clear_signals(s,lookahead_days) + clear_signals(l,lookahead_days)
return f_signal
project_tests.test_filter_signals(filter_signals)
signal_5 = filter_signals(signal, 5)
signal_10 = filter_signals(signal, 10)
signal_20 = filter_signals(signal, 20)
for signal_data, signal_days in [(signal_5, 5), (signal_10, 10), (signal_20, 20)]:
plot_signal(
close[apple_ticker],
signal_data[apple_ticker],
'Long and Short of {} Stock with {} day signal window'.format(apple_ticker, signal_days))
def get_lookahead_prices(close, lookahead_days):
"""
Get the lookahead prices for `lookahead_days` number of days.
Parameters
----------
close : DataFrame
Close price for each ticker and date
lookahead_days : int
The number of days to look ahead
Returns
-------
lookahead_prices : DataFrame
The lookahead prices for each ticker and date
"""
#TODO: Implement function
return close.shift(-1*lookahead_days)
project_tests.test_get_lookahead_prices(get_lookahead_prices)
lookahead_5 = get_lookahead_prices(close, 5)
lookahead_10 = get_lookahead_prices(close, 10)
lookahead_20 = get_lookahead_prices(close, 20)
plot_lookahead_prices(
close[apple_ticker].iloc[150:250],
[
(lookahead_5[apple_ticker].iloc[150:250], 5),
(lookahead_10[apple_ticker].iloc[150:250], 10),
(lookahead_20[apple_ticker].iloc[150:250], 20)],
'5, 10, and 20 day Lookahead Prices for Slice of {} Stock'.format(apple_ticker))
def get_return_lookahead(close, lookahead_prices):
"""
Calculate the log returns from the lookahead days to the signal day.
Parameters
----------
close : DataFrame
Close price for each ticker and date
lookahead_prices : DataFrame
The lookahead prices for each ticker and date
Returns
-------
lookahead_returns : DataFrame
The lookahead log returns for each ticker and date
"""
#TODO: Implement function
return np.log(lookahead_prices) - np.log(close)
project_tests.test_get_return_lookahead(get_return_lookahead)
price_return_5 = get_return_lookahead(close, lookahead_5)
price_return_10 = get_return_lookahead(close, lookahead_10)
price_return_20 = get_return_lookahead(close, lookahead_20)
plot_price_returns(
close[apple_ticker].iloc[150:250],
[
(price_return_5[apple_ticker].iloc[150:250], 5),
(price_return_10[apple_ticker].iloc[150:250], 10),
(price_return_20[apple_ticker].iloc[150:250], 20)],
'5, 10, and 20 day Lookahead Returns for Slice {} Stock'.format(apple_ticker))
def get_signal_return(signal, lookahead_returns):
"""
Compute the signal returns.
Parameters
----------
signal : DataFrame
The long, short, and do nothing signals for each ticker and date
lookahead_returns : DataFrame
The lookahead log returns for each ticker and date
Returns
-------
signal_return : DataFrame
Signal returns for each ticker and date
"""
#TODO: Implement function
return signal*lookahead_returns
project_tests.test_get_signal_return(get_signal_return)
title_string = '{} day LookaheadSignal Returns for {} Stock'
signal_return_5 = get_signal_return(signal_5, price_return_5)
signal_return_10 = get_signal_return(signal_10, price_return_10)
signal_return_20 = get_signal_return(signal_20, price_return_20)
plot_signal_returns(
close[apple_ticker],
[
(signal_return_5[apple_ticker], signal_5[apple_ticker], 5),
(signal_return_10[apple_ticker], signal_10[apple_ticker], 10),
(signal_return_20[apple_ticker], signal_20[apple_ticker], 20)],
[title_string.format(5, apple_ticker), title_string.format(10, apple_ticker), title_string.format(20, apple_ticker)])
# plot_signal_histograms(
# [signal_return_5, signal_return_10, signal_return_20],
# 'Signal Return',
# ('5 Days', '10 Days', '20 Days'))
plot_signal_histograms(
[signal_return_5, signal_return_10],
'Signal Return',
('5 Days', '10 Days'))
plot_signal_histograms(
[ signal_return_10, signal_return_20],
'Signal Return',
( '10 Days', '20 Days'))
我们发现10天和20天的柱形图有异常点。为了更好地可视化异常点,我们比较5天、10天和20天的信号收益和相同方差与均值的正态分布
plot_signal_to_normal_histograms(
[signal_return_5,],
'Signal Return',
('5 Days',))
plot_signal_to_normal_histograms(
[signal_return_10,],
'Signal Return',
('10 Days',))
plot_signal_to_normal_histograms(
[signal_return_20,],
'Signal Return',
('20 Days',))
# Filter out returns that don't have a long or short signal.
long_short_signal_returns_5 = signal_return_5[signal_5 != 0].stack()
long_short_signal_returns_10 = signal_return_10[signal_10 != 0].stack()
long_short_signal_returns_20 = signal_return_20[signal_20 != 0].stack()
# Get just ticker and signal return
long_short_signal_returns_5 = long_short_signal_returns_5.reset_index().iloc[:, [1,2]]
long_short_signal_returns_5.columns = ['ticker', 'signal_return']
long_short_signal_returns_10 = long_short_signal_returns_10.reset_index().iloc[:, [1,2]]
long_short_signal_returns_10.columns = ['ticker', 'signal_return']
long_short_signal_returns_20 = long_short_signal_returns_20.reset_index().iloc[:, [1,2]]
long_short_signal_returns_20.columns = ['ticker', 'signal_return']
# View some of the data
long_short_signal_returns_5.head(10)
下面实现函数calculate_kstest
计算每只股票的收益与正态分布之间的Kolmogorov-Smirnov test (KS test)。检查每只股票收益与正态分布之间的ks检测。使用scipy.stats.kstest
执行KS检测,当计算信号收益的标准差时记得设置delta的自由度为0。
from scipy.stats import kstest
def calculate_kstest(long_short_signal_returns):
"""
Calculate the KS-Test against the signal returns with a long or short signal.
Parameters
----------
long_short_signal_returns : DataFrame
The signal returns which have a signal.
This DataFrame contains two columns, "ticker" and "signal_return"
Returns
-------
ks_values : Pandas Series
KS static for all the tickers
p_values : Pandas Series
P value for all the tickers
"""
#TODO: Implement function
ks_dict={}
p_dict={}
m=long_short_signal_returns.mean()
std=long_short_signal_returns.std(ddof=0)
for signal_return in long_short_signal_returns.groupby('ticker'):
value=signal_return[1]['signal_return'].values
ks,p=kstest(value, 'norm', args=(m, std))
ks_dict[signal_return[0]]=ks
p_dict[signal_return[0]]=p
return pd.Series(ks_dict),pd.Series(p_dict)
project_tests.test_calculate_kstest(calculate_kstest)
ks_values_5, p_values_5 = calculate_kstest(long_short_signal_returns_5)
ks_values_10, p_values_10 = calculate_kstest(long_short_signal_returns_10)
ks_values_20, p_values_20 = calculate_kstest(long_short_signal_returns_20)
print('ks_values_5')
print(ks_values_5.head(10))
print('p_values_5')
print(p_values_5.head(10))
根据ks和p values的结果,我们可以找出那只股票是异常的。实现find_outliers
函数找出下面的异常点
pvalue_threshold
.ks_threshold
.def find_outliers(ks_values, p_values, ks_threshold, pvalue_threshold=0.05):
"""
Find outlying symbols using KS values and P-values
Parameters
----------
ks_values : Pandas Series
KS static for all the tickers
p_values : Pandas Series
P value for all the tickers
ks_threshold : float
The threshold for the KS statistic
pvalue_threshold : float
The threshold for the p-value
Returns
-------
outliers : set of str
Symbols that are outliers
"""
#TODO: Implement function
ks=set(ks_values[ks_values > ks_threshold].index)
p=set(p_values[p_values < pvalue_threshold].index)
return ks & p
project_tests.test_find_outliers(find_outliers)
ks_threshold = 0.8
outliers_5 = find_outliers(ks_values_5, p_values_5, ks_threshold)
outliers_10 = find_outliers(ks_values_10, p_values_10, ks_threshold)
outliers_20 = find_outliers(ks_values_20, p_values_20, ks_threshold)
outlier_tickers = outliers_5.union(outliers_10).union(outliers_20)
print('{} Outliers Found:\n{}'.format(len(outlier_tickers), ', '.join(list(outlier_tickers))))
good_tickers = list(set(close.columns) - outlier_tickers)
plot_signal_to_normal_histograms(
[signal_return_5[good_tickers],],
'Signal Return Without Outliers',
('5 Days',))
plot_signal_to_normal_histograms(
[signal_return_10[good_tickers],],
'Signal Return Without Outliers',
('10 Days',))
plot_signal_to_normal_histograms(
[signal_return_20[good_tickers],],
'Signal Return Without Outliers',
('20 Days',))