基于prompt方法打败传统微调
一、前言
prompt是新型的范式,主要是针对于少样本和零样本。prompt目前主要是分为离散型和连续型,上一篇我介绍过连续型prompt。连续型prompt方法没有离散型prompt方法效果好。可以通过人工设计出来的模板,具有更强的可解释性,我在kaggle的Feedback Prize - Predicting Effective Arguments比赛中尝试离散型模板效果比微调高一截。但此方法主要是适用于少样本或者大量样本微调的时候提升效果,并不适合零样本学习。
归来仍是少年:提示学习soft prompt浅尝
二、prompt介绍
prompt主要是利用Bert这类模型的预训练任务mlm,在预训练的时候随机mask掉15%token并且对mask掉的token进行预测,从而让Bert模型学到上下文语意关系。prompt就是人工构造模板来靠近Bert在大数据上预训练学习到的先验知识。将Bert finetune分类任务变成mlm任务。
正常微调举例:
[cls]今天天上都出太阳了,阳光明媚。[SEP]
prompt输入举例:
[cls]今天天气是[MASK]。[SEP] 今天天上都出太阳了,阳光明媚。[SEP]
正常微调的话就是通过数据标签来学习到与天气的映射关系,文本分类微调是取CLS向量进入全连接层进行标签分类。而prompt则是构建提示句,prompt进行微调是取mask预测的结果来映射到标签与mlm任务是保持一致,会更好的利用预训练学习到的知识。因为是人工构建的模板,所以非常取决于构建的模板质量,不同的模板之后准确率可能差距有几个百分点这么大。
手工设计模板
Prompt最开始就是从手工设计模板开始的。手工设计一般基于人类的自然语言知识,力求得到语义流畅且高效的模板。例如,Petroni等人在著名的LAMA数据集中为知识探针任务手工设计了cloze templates;Brown等人为问答、翻译和探针等任务设计了prefix templates。手工设计模板的好处是较为直观,但缺点是需要很多实验、经验以及语言专业知识,代价较大。
自动学习模板
为了解决手工设计模板的缺点,许多研究开始探究如何自动学习到合适的模板。自动学习的模板又可以分为离散(Discrete Prompts)和连续(Continuous Prompts)两大类。离散的主要包括 Prompt Mining, Prompt Paraphrasing, Gradient-based Search, Prompt Generation 和 Prompt Scoring;连续的则主要包括Prefix Tuning, Tuning Initialized with Discrete Prompts 和 Hard-Soft Prompt Hybrid Tuning。
三、在kaggle Feedback Prize比赛实验prompt并改进方法
之前在kaggle论坛看到取得grand master的老哥用的是prompt方法获得金牌,于是我决定尝试prompt方法,不然可能会技术落后。
找到一篇prompt的相关论文并且根据论文思路做了些改进,方法实现起来会更加的方便和快捷。pet论文是做少样本文本分类,显著提高了少样本下文本分类的效果。
论文标题:
Exploiting Cloze Questions for Few Shot Text Classification and Natural Language Inference。
PLM任务
论文中将文本分类任务转化成类似于阅读理解填空的PLM,阅读理解和Bert预训练任务mlm有点区别的地方在于,阅读理解填空的答案是从文章中寻找候选项,而mlm任务预测mask填空部分其中Bert整个词表里面的token都是填空候选。所以我们在构建模板的时候需要注意的是预测mask的token必须要和我们文本分类任务的标签产生联系。人工构建的模板最好是能在零样本下预测mask的token靠近下游文本分类的标签,无疑这样的模板是成功的。
简单介绍下Feedback Prize比赛任务,学生对于一篇议论文的提出自己的论点,标签是三类分别是Adequate(足够的)、Effective(有效的)、Ineffective(无效的)。
针对上面任务如何构建有效的模板呢?下面我来介绍我的技巧。
这里可以使用最简单的办法,利用huggingface models的api。
huggingface models api
从里面随便挑选一个model来做实验。我这里就选择robert base。
比赛任务是来分类学生对于议论文提出的论点正确与否,于是我构建这样的模板:
student argument is <mask>.
roberta base mlm api
从图中可以看roberta base预测出来的top5 token,valid(有效的)、weak(不能令人信服的)、strong(强有力的)、true(正确的)、sound(合理的)。
MASK预测top3 token与标签映射关系:
valid ---> Effective
weak ---> Ineffective
strong ---> Adequate
prompt+预训练模型,在数据上微调的时候学习到不同的文本句子中mask预测值和标签产生了强关联性,这样就将文本分类任务转化为mask lm任务。
prompt常规做法:
常规做法是在微调之后取mask预测的词然后去之前设定好的标签映射字典找到对应预测结果,微调的反向传播会更新模型权重提高mask位置预测词的精准性。这个方法的好处就是在零样本下也可以使用,坏处就是需要找标签映射词稍微麻烦点。
我改进的prompt做法:
其实我的主体思想也是pet这篇论文演化出来的,主要是改进了需要事先构建标签映射的麻烦事。预测结果直接和文本分类是一样的,但是我取的向量并不是CLS向量,而是MASK位置的向量,模板中只需要一个mask标识符就好,因为只需要学到映射关系就行。
prompt预测示意图
映射标签的操作我交给全连接层来做,只需要将deberta输出向量mask位置的向量输入进全连接层,这样在微调的时候全连接预测结果靠近分类标签。这样做有个缺点就是不能应用在零样本上,只适合少样本和全量数据微调。
四、prompt实现
插入模板实现
tokenizer = AutoTokenizer.from_pretrained(CFG.model)
collate_fn = Collate(tokenizer, isTrain=True)
df = pd.read_csv("./feedback/train.csv")
df['essay'] = df['essay_id'].apply(fetchEssay)
query = 'student argument is '+str(tokenizer.mask_token)+'.'
new_label = {"Ineffective": 0, "Adequate": 1, "Effective": 2}
df['discourse_effectiveness'] = df['discourse_effectiveness'].apply(lambda x: new_label[x] )
#query+tokenizer.sep_token+
df['text'] = df.apply(lambda x: query+tokenizer.sep_token+x['discourse_type']+tokenizer.sep_token+x['discourse_text']+tokenizer.sep_token+x['essay'],axis=1) #df.apply(lambda x: x['essay_id']+'[SEP]'+x['discourse_id']+'[SEP]'+x['discourse_text'],axis=1)
mask_index = tokenizer.encode_plus(query,
add_special_tokens=True,
max_length=CFG.max_len,
truncation=True,
return_offsets_mapping=False)["input_ids"].index(tokenizer.mask_token_id)
print('mask的位置在',mask_index)
模型部分
class FeedBackModel(nn.Module):
def __init__(self, model_path):
super(FeedBackModel, self).__init__()
self.config = AutoConfig.from_pretrained(model_path)
self.model = AutoModel.from_pretrained(model_path)
self.linear = nn.Linear(self.config.hidden_size,CFG.target_size)
def forward(self, ids, mask, mask_index):
x = self.model(ids, mask)[0][:,mask_index,:]
pred = self.linear(x)
return pred
完整文本分类代码
import gc
import os
from re import X
from unittest import TestSuite
os.environ['CUDA_VISIBLE_DEVICES'] = '1'
os.environ['CUDA_LAUNCH_BLOCKING'] = '1'
os.environ["TOKENIZERS_PARALLELISM"] = "false"
import sys
import time
import pickle
import random
import numpy as np
import pandas as pd
# from tqdm.notebook import tqdm
from tqdm import tqdm
from sklearn.metrics import log_loss
from sklearn.model_selection import StratifiedKFold,GroupKFold
from tools import StratifiedGroupKFold
import torch
import transformers
import torch.nn as nn
import torch.nn.functional as F
from torch.cuda.amp import GradScaler, autocast
from torch.utils.data import Dataset, DataLoader
from text_unidecode import unidecode
from typing import Dict, List, Tuple
from torchcontrib.optim import SWA
import codecs
from transformers import AutoModel, AutoTokenizer, AdamW, get_linear_schedule_with_warmup, get_cosine_schedule_with_warmup,AutoConfig
import warnings
warnings.simplefilter('ignore')
def fetchEssay(essay_id: str):
"""
Read the text file of the specific essay_id
"""
essay_path = os.path.join('./feedback/train/', essay_id + '.txt')
essay_text = open(essay_path, 'r').read()
return essay_text
def seed_everything(seed):
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
# Set a fixed value for the hash seed
os.environ['PYTHONHASHSEED'] = str(seed)
class CFG:
wandb=True
competition='PPPM'
_wandb_kernel='nakama'
debug=False
apex=False
print_freq=100
num_workers=4
model='./pretrain_model/deberta_v3_large'
scheduler='cosine' # ['linear', 'cosine']
batch_scheduler=True
num_cycles=0.5
num_warmup_steps=0
epochs=1
encoder_lr=1e-5
decoder_lr=1e-5
min_lr=1e-6
eps=1e-6
betas=(0.9, 0.999)
batch_size=8
fc_dropout=0.1
target_size=3
max_len=512
weight_decay=0.01
gradient_accumulation_steps=1
max_grad_norm=1000
seed=42
n_fold=5
trn_fold=[i for i in range(n_fold)]
train=True
seed_everything(CFG.seed)
class callback:
def __init__(self):
self.loss = list()
self.model = list()
def put(self, model, loss):
self.loss.append(loss)
self.model.append(model)
def get_model(self):
ind = np.argmin(self.loss)
return self.model[ind]
class FeedBackDataset(Dataset):
def __init__(self, data, model_path, is_test=False):
self.data = data
self.is_test = is_test
self.tokenizer = AutoTokenizer.from_pretrained(model_path)
def __getitem__(self, idx):
text = self.data['text'].values[idx]
if not self.is_test:
target_value = self.data[y_cols].values[idx]
inputs = self.tokenizer.encode_plus(text,
add_special_tokens=True,
max_length=CFG.max_len,
truncation=True,
return_offsets_mapping=False)
if self.is_test:
return {
'input_ids': inputs['input_ids'],
'attention_mask': inputs['attention_mask'],
}
else:
targets = torch.FloatTensor(target_value)
return {
'input_ids': inputs['input_ids'],
'attention_mask': inputs['attention_mask'],
'targets': targets
}
def __len__(self):
return len(self.data)
def softmax(z):
assert len(z.shape) == 2
s = np.max(z, axis=1)
s = s[:, np.newaxis] # necessary step to do broadcasting
e_x = np.exp(z - s)
div = np.sum(e_x, axis=1)
div = div[:, np.newaxis] # dito
return e_x / div
def monitor_metrics(outputs, targets):
device = targets.get_device()
mll = log_loss(
targets.cpu().detach().numpy(),
softmax(outputs.cpu().detach().numpy()),
labels=[0, 1, 2],
)
return mll
class Collate:
def __init__(self, tokenizer, isTrain=True):
self.tokenizer = tokenizer
self.isTrain = isTrain
# self.args = args
def __call__(self, batch):
output = dict()
output["input_ids"] = [sample["input_ids"] for sample in batch]
output["attention_mask"] = [sample["attention_mask"] for sample in batch]
if self.isTrain:
output["targets"] = [sample["targets"] for sample in batch]
# calculate max token length of this batch
batch_max = max([len(ids) for ids in output["input_ids"]])
# add padding
if self.tokenizer.padding_side == "right":
output["input_ids"] = [s + (batch_max - len(s)) * [self.tokenizer.pad_token_id] for s in output["input_ids"]]
output["attention_mask"] = [s + (batch_max - len(s)) * [0] for s in output["attention_mask"]]
else:
output["input_ids"] = [(batch_max - len(s)) * [self.tokenizer.pad_token_id] + s for s in output["input_ids"]]
output["attention_mask"] = [(batch_max - len(s)) * [0] + s for s in output["attention_mask"]]
# convert to tensors
output["input_ids"] = torch.tensor(output["input_ids"], dtype=np.long)
output["attention_mask"] = torch.tensor(output["attention_mask"], dtype=np.long)
if self.isTrain:
output["targets"] = torch.tensor(output["targets"], dtype=np.long)
return output
tokenizer = AutoTokenizer.from_pretrained(CFG.model)
collate_fn = Collate(tokenizer, isTrain=True)
df = pd.read_csv("./feedback/train.csv")
df['essay'] = df['essay_id'].apply(fetchEssay)
query = 'student argument is '+str(tokenizer.mask_token)+'.'
new_label = {"Ineffective": 0, "Adequate": 1, "Effective": 2}
df['discourse_effectiveness'] = df['discourse_effectiveness'].apply(lambda x: new_label[x] )
#query+tokenizer.sep_token+
df['text'] = df.apply(lambda x: query+tokenizer.sep_token+x['discourse_type']+tokenizer.sep_token+x['discourse_text']+tokenizer.sep_token+x['essay'],axis=1) #df.apply(lambda x: x['essay_id']+'[SEP]'+x['discourse_id']+'[SEP]'+x['discourse_text'],axis=1)
mask_index = tokenizer.encode_plus(query,
add_special_tokens=True,
max_length=CFG.max_len,
truncation=True,
return_offsets_mapping=False)["input_ids"].index(tokenizer.mask_token_id)
print('mask的位置在',mask_index)
print(tokenizer.tokenize(query))
print(df.head())
OUTPUT_DIR = './save_model/'
os.system('rm -rf '+OUTPUT_DIR+'*')
y_cols = ['discourse_effectiveness']
class FeedBackModel(nn.Module):
def __init__(self, model_path):
super(FeedBackModel, self).__init__()
self.config = AutoConfig.from_pretrained(model_path)
self.model = AutoModel.from_pretrained(model_path)
self.linear = nn.Linear(self.config.hidden_size,CFG.target_size)
def forward(self, ids, mask):
x = self.model(ids, mask)[0][:,mask_index,:]
pred = self.linear(x)
return pred
class FGM():
def __init__(self, model):
self.model = model
self.backup = {}
def attack(self, epsilon=0.5, emb_name='word_embeddings'): #DebertaV2Embeddings.word_embedding
# emb_name这个参数要换成你模型中embedding的参数名
# 例如,self.emb = nn.Embedding(5000, 100)
for name, param in self.model.named_parameters():
if param.requires_grad and emb_name in name:
self.backup[name] = param.data.clone()
norm = torch.norm(param.grad) # 默认为2范数
if norm != 0:
r_at = epsilon * param.grad / norm
param.data.add_(r_at)
def restore(self, emb_name='word_embeddings'):
# emb_name这个参数要换成你模型中embedding的参数名
for name, param in self.model.named_parameters():
if param.requires_grad and emb_name in name:
assert name in self.backup
param.data = self.backup[name]
self.backup = {}
kf = StratifiedGroupKFold(n_splits=CFG.n_fold)
local_cv_loss = 0
for i, (train_idx, valid_idx) in enumerate(kf.split(X=df,y=df['discourse_effectiveness'], groups=df['essay_id'])):
# if i+1 not in [4]:
# continue
# if i+1 == 1 :
# CFG.encoder_lr = 8e-6
# CFG.decoder_lr = 8e-6
print('*'*50+f'fold {i+1}'+'*'*50)
gc.collect()
cb = callback()
train_loader = torch.utils.data.DataLoader(FeedBackDataset(df.loc[train_idx, :].reset_index(drop=True), CFG.model), batch_size=CFG.batch_size, shuffle=True, num_workers=4,collate_fn=collate_fn)
val_loader = torch.utils.data.DataLoader(FeedBackDataset(df.loc[valid_idx, :].reset_index(drop=True), CFG.model), batch_size=CFG.batch_size, shuffle=False, num_workers=4,collate_fn=collate_fn)
net = FeedBackModel(CFG.model)
net.cuda()
fgm = FGM(net)
loss_fn = torch.nn.CrossEntropyLoss()
def get_optimizer_params(model, encoder_lr, decoder_lr, weight_decay=0.0):
no_decay = ["bias", "LayerNorm.bias", "LayerNorm.weight"]
optimizer_parameters = [
{'params': [p for n, p in model.model.named_parameters() if not any(nd in n for nd in no_decay)],
'lr': encoder_lr, 'weight_decay': weight_decay},
{'params': [p for n, p in model.model.named_parameters() if any(nd in n for nd in no_decay)],
'lr': encoder_lr, 'weight_decay': 0.0},
{'params': [p for n, p in model.named_parameters() if "model" not in n],
'lr': decoder_lr, 'weight_decay': 0.0}
]
return optimizer_parameters
optimizer_parameters = get_optimizer_params(net,
encoder_lr=CFG.encoder_lr,
decoder_lr=CFG.decoder_lr,
weight_decay=CFG.weight_decay)
optimizer = AdamW(net.parameters(), lr = CFG.encoder_lr, eps=CFG.eps, betas=CFG.betas)
num_train_optimization_steps = int(CFG.epochs * len(train_loader) / CFG.gradient_accumulation_steps)
# ====================================================
# scheduler
# ====================================================
def get_scheduler(cfg, optimizer, num_train_steps):
if cfg.scheduler == 'linear':
scheduler = get_linear_schedule_with_warmup(
optimizer, num_warmup_steps=cfg.num_warmup_steps, num_training_steps=num_train_steps
)
elif cfg.scheduler == 'cosine':
scheduler = get_cosine_schedule_with_warmup(
optimizer, num_warmup_steps=cfg.num_warmup_steps, num_training_steps=num_train_steps, num_cycles=cfg.num_cycles
)
return scheduler
scheduler = get_scheduler(CFG, optimizer, num_train_optimization_steps)
scaler = torch.cuda.amp.GradScaler()
best_log_loss = float('inf')
for epoch in range(CFG.epochs):
start_time = time.time()
avg_loss = 0.0
net.train()
tbar = tqdm(train_loader, file=sys.stdout)
loss_list = []
val_loss_list = []
val_log_loss_list = []
for step, data in enumerate(tbar):
# get the inputs
input_ids = data['input_ids'].cuda()
input_masks = data['attention_mask'].cuda()
targets = data['targets'].long().view(-1).cuda()
with torch.cuda.amp.autocast():
pred = net(input_ids,input_masks)
loss = loss_fn(pred, targets)
loss = loss / CFG.gradient_accumulation_steps
scaler.scale(loss).backward()
with torch.cuda.amp.autocast():
fgm.attack() # 在embedding上添加对抗扰动#model.embeddings.word_embeddings
pred = net(input_ids,input_masks)
loss_adv = loss_fn(pred, targets) / CFG.gradient_accumulation_steps
scaler.scale(loss_adv).backward()
fgm.restore() # 恢复embedding参数
if (step+1) % CFG.gradient_accumulation_steps == 0 or step == len(tbar) - 1:
scaler.step(optimizer)
scaler.update()
optimizer.zero_grad()
scheduler.step()
loss_list.append(loss.detach().cpu().item())
avg_loss = np.round(np.mean(loss_list), 4)
tbar.set_description(f"Epoch {epoch + 1} Loss: {avg_loss} lr: {scheduler.get_last_lr()}")
net.eval()
avg_val_loss = float('inf')
avg_val_log_loss = float('inf')
tbar_val = tqdm(val_loader, file=sys.stdout)
for step, data in enumerate(tbar_val):
# get the inputs
input_ids = data['input_ids'].cuda()
input_masks = data['attention_mask'].cuda()
targets = data['targets'].long().view(-1).cuda()
with torch.no_grad():
pred = net(input_ids,input_masks)
loss = loss_fn(pred, targets)
val_loss_list.append(loss.detach().cpu().item())
avg_val_loss = np.round(np.mean(val_loss_list), 4)
val_log_loss = monitor_metrics(pred, targets)
val_log_loss_list.append(val_log_loss)
avg_val_log_loss = np.round(np.mean(val_log_loss_list), 4)
tbar_val.set_description(f"Epoch {epoch + 1} Loss: {avg_val_loss:.4f} val_log_loss: {avg_val_log_loss:.4f}")
if best_log_loss > avg_val_log_loss:
best_log_loss = avg_val_log_loss
torch.save({'model': net.state_dict()},
OUTPUT_DIR+f"{CFG.model.split('/')[-1]}_fold{i}_best.pth")
print(f'Epoch {epoch+1} Loss: {avg_val_loss:.4f} val_log_loss: {avg_val_log_loss:.4f} --- Save Best log_loss: {best_log_loss:.4f} Model')
print('\n')
cb.put(net, avg_val_loss)
if best_log_loss is not float('inf'):
local_cv_loss += best_log_loss/CFG.n_fold
print(f'local cv loss: {local_cv_loss:.4f}')
五、简单消融实验
Feedback Prize - Predicting Effective Arguments(训练集3.6w)
模型 | logloss |
---|---|
deberta large+fgm+五折+微调 | 0.634 |
deberta large+fgm+五折+prompt | 0.602 |
高鲁棒性要求下的领域事件检测任务关系分类(训练集4.2w)
模型 | f1 |
---|---|
nezha wwm base+微调 | 86 |
nezha wwm base+prompt | 88 |
业务数据集文本分类(训练集40w+)
模型 | f1 |
---|---|
bert base+微调 | 0.9779 |
bert base+prompt | 0.9806 |
从结果是看prompt能够提升几个百分点,但人工的prompt非常依赖设计的模板,不同模板可能会差几个百分点。
六、总结
我这改进的方法主要是一种微调的优化方法,能够在少样本和大量样本上产生较好的提升,prompt算是现在研究的重点之一,工业界大部分都是少样本的情况,标注成本比较高,prompt能够在少样本上带来比较大的提升同时又不会降低推理速度,在工业界上具有较高的应用价值。
七、参考文献
- AI Box专栏:NLP新宠------浅谈Prompt的前世今生
- https://arxiv.org/pdf/2001.07676.pdf
- https://arxiv.org/pdf/2107.13586.pdf
- 归来仍是少年:提示学习soft prompt浅尝