该文章由AI生成

BERT情感分析训练教程 - IMDb数据集

本教程使用BERT模型对IMDb电影评论数据集进行情感分析训练,从数据准备到模型部署的完整流程。

目录

  1. 环境准备和依赖安装
  2. 数据集下载和预处理
  3. 数据集类定义
  4. 模型训练工具
  5. 模型初始化
  6. 模型训练
  7. 模型评估
  8. 预测示例
  9. 模型保存和加载
  10. 总结和使用说明

1. 环境准备和依赖安装

首先安装必要的Python包:

1
pip install torch transformers scikit-learn tqdm requests numpy matplotlib seaborn

然后导入所需的库:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import os
import glob
import torch
import numpy as np
import requests
import tarfile
from tqdm import tqdm
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader
from transformers import (
   BertTokenizer,
   BertForSequenceClassification,
   AdamW,
   get_linear_schedule_with_warmup
)

# 设置设备
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"使用设备: {device}")
if torch.cuda.is_available():
   print(f"GPU型号: {torch.cuda.get_device_name(0)}")
   print(f"GPU内存: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.1f} GB")

2. 数据集下载和预处理

下载IMDb数据集

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def download_imdb_dataset(filename, url="http://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz"):
   """下载IMDb数据集"""
   if os.path.exists(filename):
       print(f"{filename} 已存在,跳过下载")
       return
   
   print(f"正在下载 {filename}...")
   try:
       response = requests.get(url, stream=True)
       response.raise_for_status()
       
       total_size = int(response.headers.get('content-length', 0))
       with open(filename, 'wb') as f, tqdm(
           desc=filename,
           total=total_size,
           unit='B',
           unit_scale=True,
           unit_divisor=1024,
      ) as pbar:
           for chunk in response.iter_content(chunk_size=8192):
               if chunk:
                   f.write(chunk)
                   pbar.update(len(chunk))
       print(f"{filename} 下载完成")
   except Exception as e:
       print(f"下载失败: {e}")
       raise

def extract_imdb_dataset(filename, extract_path='./'):
   """解压IMDb数据集"""
   if os.path.exists(os.path.join(extract_path, 'aclImdb')):
       print("数据集已解压,跳过解压")
       return
   
   print(f"正在解压 {filename}...")
   try:
       with tarfile.open(filename, 'r:gz') as tar:
           tar.extractall(path=extract_path)
       print("解压完成")
   except Exception as e:
       print(f"解压失败: {e}")
       raise

# 执行下载和解压
dataset_file = 'aclImdb_v1.tar.gz'
dataset_extract_path = './'

download_imdb_dataset(dataset_file)
extract_imdb_dataset(dataset_file, dataset_extract_path)

加载和预处理数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def load_imdb_data(extract_path):
   """加载IMDb数据集"""
   texts = []
   labels = []
   
   # 加载训练数据
   for label in ['pos', 'neg']:
       path = os.path.join(extract_path, 'aclImdb', 'train', label)
       if not os.path.exists(path):
           raise FileNotFoundError(f"路径不存在: {path}")
       
       files = glob.glob(os.path.join(path, '*.txt'))
       print(f"正在加载 {label} 数据: {len(files)} 个文件")
       
       for file in tqdm(files, desc=f"Loading {label}"):
           try:
               with open(file, 'r', encoding='utf-8') as f:
                   text = f.read().strip()
                   if text:  # 确保文本不为空
                       texts.append(text)
                       labels.append(1 if label == 'pos' else 0)
           except Exception as e:
               print(f"读取文件失败 {file}: {e}")
               continue
   
   print(f"总共加载 {len(texts)} 条数据")
   return texts, labels

# 加载数据
texts, labels = load_imdb_data(dataset_extract_path)
labels = np.array(labels)

# 数据集统计
print(f"\n数据集统计:")
print(f"总样本数: {len(texts)}")
print(f"积极样本: {sum(labels)} ({sum(labels)/len(labels)*100:.1f}%)")
print(f"消极样本: {len(labels) - sum(labels)} ({(len(labels) - sum(labels))/len(labels)*100:.1f}%)")

# 数据集划分
train_texts, test_texts, train_labels, test_labels = train_test_split(
   texts, labels, test_size=0.2, random_state=42, stratify=labels
)

train_texts, val_texts, train_labels, val_labels = train_test_split(
   train_texts, train_labels, test_size=0.2, random_state=42, stratify=train_labels
)

print(f"数据集划分:")
print(f"训练集大小: {len(train_texts)} ({len(train_texts)/len(texts)*100:.1f}%)")
print(f"验证集大小: {len(val_texts)} ({len(val_texts)/len(texts)*100:.1f}%)")
print(f"测试集大小: {len(test_texts)} ({len(test_texts)/len(texts)*100:.1f}%)")

3. 数据集类定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class IMDbDataset(Dataset):
   def __init__(self, texts, labels, tokenizer, max_len=512):
       self.texts = texts
       self.labels = labels
       self.tokenizer = tokenizer
       self.max_len = max_len

   def __len__(self):
       return len(self.texts)

   def __getitem__(self, idx):
       text = str(self.texts[idx])
       label = self.labels[idx]

       # 文本预处理:移除过长的空白字符
       text = ' '.join(text.split())

       encoding = self.tokenizer.encode_plus(
           text,
           add_special_tokens=True,
           max_length=self.max_len,
           return_token_type_ids=False,
           padding='max_length',
           truncation=True,
           return_attention_mask=True,
           return_tensors='pt',
      )

       return {
           'input_ids': encoding['input_ids'].flatten(),
           'attention_mask': encoding['attention_mask'].flatten(),
           'labels': torch.tensor(label, dtype=torch.long)
      }

4. 模型训练工具

早停机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class EarlyStopping:
   def __init__(self, patience=5, delta=0):
       self.patience = patience
       self.delta = delta
       self.best_score = None
       self.early_stop = False
       self.counter = 0

   def __call__(self, val_loss, model, model_path):
       score = -val_loss
       if self.best_score is None:
           self.best_score = score
           self.save_checkpoint(model, model_path)
       elif score < self.best_score + self.delta:
           self.counter += 1
           print(f"EarlyStopping counter: {self.counter} out of {self.patience}")
           if self.counter >= self.patience:
               self.early_stop = True
       else:
           self.best_score = score
           self.save_checkpoint(model, model_path)
           self.counter = 0

   def save_checkpoint(self, model, model_path):
       """保存模型检查点"""
       os.makedirs(os.path.dirname(model_path) if os.path.dirname(model_path) else '.', exist_ok=True)
       torch.save(model.state_dict(), model_path)
       print(f"✅ 模型已保存到: {model_path}")

训练函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def train_model(model, train_loader, val_loader, optimizer, scheduler, epochs=3, model_path='best_model.pth'):
   """训练模型"""
   early_stopping = EarlyStopping(patience=3, delta=0.001)
   
   for epoch in range(epochs):
       print(f"\n{'='*60}")
       print(f"Epoch {epoch + 1}/{epochs}")
       print(f"{'='*60}")
       
       # 训练阶段
       model.train()
       total_loss = 0
       train_progress = tqdm(train_loader, desc=f"Training")
       
       for batch in train_progress:
           input_ids = batch['input_ids'].to(device)
           attention_mask = batch['attention_mask'].to(device)
           labels = batch['labels'].to(device)

           optimizer.zero_grad()

           outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
           loss = outputs.loss

           loss.backward()
           
           # 梯度裁剪,防止梯度爆炸
           torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
           
           optimizer.step()
           scheduler.step()

           total_loss += loss.item()
           train_progress.set_postfix({'loss': f'{loss.item():.4f}'})

       avg_train_loss = total_loss / len(train_loader)
       print(f"\n📊 平均训练损失: {avg_train_loss:.4f}")

       # 验证阶段
       val_loss = validate_model(model, val_loader)
       print(f"📊 验证损失: {val_loss:.4f}")

       # 早停机制
       early_stopping(val_loss, model, model_path)
       if early_stopping.early_stop:
           print("\n🛑 Early stopping triggered")
           break

def validate_model(model, val_loader):
   """验证模型"""
   model.eval()
   total_loss = 0
   
   with torch.no_grad():
       val_progress = tqdm(val_loader, desc="Validating")
       for batch in val_progress:
           input_ids = batch['input_ids'].to(device)
           attention_mask = batch['attention_mask'].to(device)
           labels = batch['labels'].to(device)

           outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
           loss = outputs.loss
           total_loss += loss.item()
           val_progress.set_postfix({'loss': f'{loss.item():.4f}'})

   return total_loss / len(val_loader)

def evaluate_model(model, test_loader):
   """评估模型"""
   model.eval()
   total_correct = 0
   total_samples = 0
   all_predictions = []
   all_labels = []
   
   with torch.no_grad():
       test_progress = tqdm(test_loader, desc="Testing")
       for batch in test_progress:
           input_ids = batch['input_ids'].to(device)
           attention_mask = batch['attention_mask'].to(device)
           labels = batch['labels'].to(device)

           outputs = model(input_ids, attention_mask=attention_mask)
           logits = outputs.logits
           predictions = torch.argmax(logits, dim=1)
           
           total_correct += (predictions == labels).sum().item()
           total_samples += labels.size(0)
           
           all_predictions.extend(predictions.cpu().numpy())
           all_labels.extend(labels.cpu().numpy())
           
           current_acc = total_correct / total_samples if total_samples > 0 else 0
           test_progress.set_postfix({'accuracy': f'{current_acc:.4f}'})
   
   accuracy = total_correct / total_samples
   print(f"\n🎯 测试准确率: {accuracy:.4f} ({total_correct}/{total_samples})")
   return accuracy, all_predictions, all_labels

5. 模型初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# 初始化Tokenizer
print("🔄 正在加载tokenizer...")
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
print("✅ Tokenizer加载完成")

# 测试tokenizer
sample_text = "This movie is absolutely amazing!"
tokens = tokenizer.tokenize(sample_text)
print(f"\n示例文本: {sample_text}")
print(f"分词结果: {tokens[:10]}...")
print(f"词汇表大小: {tokenizer.vocab_size}")

# 创建数据集和数据加载器
print("🔄 正在创建数据加载器...")

train_dataset = IMDbDataset(train_texts, train_labels, tokenizer)
val_dataset = IMDbDataset(val_texts, val_labels, tokenizer)
test_dataset = IMDbDataset(test_texts, test_labels, tokenizer)

# 设置批次大小(可根据GPU内存调整)
batch_size = 16
print(f"批次大小: {batch_size}")

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

print(f"✅ 数据加载器创建完成")
print(f"训练批次数: {len(train_loader)}")
print(f"验证批次数: {len(val_loader)}")
print(f"测试批次数: {len(test_loader)}")

# 初始化模型
print("🔄 正在加载BERT模型...")
model = BertForSequenceClassification.from_pretrained(
   'bert-base-uncased',
   num_labels=2  # 二分类:积极/消极
).to(device)

print("✅ BERT模型加载完成")
print(f"模型参数数量: {sum(p.numel() for p in model.parameters()):,}")
print(f"可训练参数数量: {sum(p.numel() for p in model.parameters() if p.requires_grad):,}")

# 配置优化器和学习率调度器
print("🔄 配置优化器和调度器...")

# 超参数设置
learning_rate = 2e-5
epochs = 3
eps = 1e-8

optimizer = AdamW(model.parameters(), lr=learning_rate, eps=eps)

total_steps = len(train_loader) * epochs
scheduler = get_linear_schedule_with_warmup(
   optimizer,
   num_warmup_steps=0,
   num_training_steps=total_steps
)

print(f"✅ 优化器配置完成")
print(f"学习率: {learning_rate}")
print(f"训练轮数: {epochs}")
print(f"总训练步数: {total_steps}")

6. 模型训练

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 开始训练
print("🚀 开始训练模型...")
print(f"设备: {device}")
print(f"批次大小: {batch_size}")
print(f"训练轮数: {epochs}")
print("\n" + "="*80)

model_path = 'best_model.pth'

# 训练模型
train_model(
model=model,
train_loader=train_loader,
val_loader=val_loader,
optimizer=optimizer,
scheduler=scheduler,
epochs=epochs,
model_path=model_path
)

print("\n🎉 训练完成!")

7. 模型评估

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# 加载最佳模型并评估
print("🔄 加载最佳模型...")
if os.path.exists(model_path):
model.load_state_dict(torch.load(model_path, map_location=device))
print("✅ 最佳模型加载成功")
else:
print("⚠️ 最佳模型文件不存在,使用当前模型")

print("\n" + "="*60)
print("📊 在测试集上评估模型...")
print("="*60)

accuracy, predictions, true_labels = evaluate_model(model, test_loader)

# 详细的评估指标
from sklearn.metrics import classification_report, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns

# 分类报告
print("\n📋 分类报告:")
print(classification_report(true_labels, predictions, target_names=['消极', '积极']))

# 混淆矩阵
cm = confusion_matrix(true_labels, predictions)
print("\n🔍 混淆矩阵:")
print(cm)

# 可视化混淆矩阵
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
xticklabels=['消极', '积极'], yticklabels=['消极', '积极'])
plt.title('混淆矩阵')
plt.xlabel('预测标签')
plt.ylabel('真实标签')
plt.show()

# 计算各项指标
tn, fp, fn, tp = cm.ravel()
precision = tp / (tp + fp)
recall = tp / (tp + fn)
f1 = 2 * (precision * recall) / (precision + recall)

print(f"\n📊 详细指标:")
print(f"准确率 (Accuracy): {accuracy:.4f}")
print(f"精确率 (Precision): {precision:.4f}")
print(f"召回率 (Recall): {recall:.4f}")
print(f"F1分数: {f1:.4f}")

8. 预测示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
def predict_sentiment(text, model, tokenizer, device):
"""预测单个文本的情感"""
model.eval()

# 文本预处理
text = ' '.join(str(text).split())

encoding = tokenizer.encode_plus(
text,
add_special_tokens=True,
max_length=512,
padding='max_length',
truncation=True,
return_attention_mask=True,
return_tensors='pt'
)

input_ids = encoding['input_ids'].to(device)
attention_mask = encoding['attention_mask'].to(device)

with torch.no_grad():
outputs = model(input_ids, attention_mask=attention_mask)
logits = outputs.logits
probabilities = torch.nn.functional.softmax(logits, dim=1)
prediction = torch.argmax(logits, dim=1).item()
confidence = probabilities[0][prediction].item()

sentiment = "积极" if prediction == 1 else "消极"
return sentiment, confidence

# 示例预测
test_texts_examples = [
"This movie is absolutely amazing! I love it!",
"This is the worst movie I have ever seen.",
"The movie was okay, nothing special.",
"Fantastic acting and great storyline!",
"I fell asleep during the movie. So boring.",
"The cinematography was breathtaking and the story was compelling.",
"Waste of time and money. Terrible plot.",
"One of the best films I've watched this year!"
]

print("🎬 情感分析预测示例:")
print("="*80)

for i, text in enumerate(test_texts_examples, 1):
sentiment, confidence = predict_sentiment(text, model, tokenizer, device)

# 根据情感选择emoji
emoji = "😊" if sentiment == "积极" else "😞"

print(f"\n{i}. 文本: {text}")
print(f" 预测: {sentiment} {emoji} (置信度: {confidence:.4f})")
print(f" {'-'*60}")

print("\n✅ 预测完成!")

9. 模型保存和加载

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# 保存完整模型(包括tokenizer)
model_save_path = './bert_sentiment_model'

print(f"💾 保存模型到: {model_save_path}")

# 创建保存目录
os.makedirs(model_save_path, exist_ok=True)

# 保存模型和tokenizer
model.save_pretrained(model_save_path)
tokenizer.save_pretrained(model_save_path)

print("✅ 模型和tokenizer保存完成")
print(f" 模型文件: {model_save_path}/pytorch_model.bin")
print(f" 配置文件: {model_save_path}/config.json")
print(f" 词汇表: {model_save_path}/vocab.txt")

# 测试模型加载
print("🔄 测试模型加载...")

# 加载保存的模型
loaded_model = BertForSequenceClassification.from_pretrained(model_save_path).to(device)
loaded_tokenizer = BertTokenizer.from_pretrained(model_save_path)

print("✅ 模型加载成功")

# 测试加载的模型
test_text = "This movie is fantastic!"
sentiment, confidence = predict_sentiment(test_text, loaded_model, loaded_tokenizer, device)

print(f"\n🧪 测试预测:")
print(f"文本: {test_text}")
print(f"预测: {sentiment} (置信度: {confidence:.4f})")
print("\n✅ 模型加载测试通过!")

10. 总结和使用说明

训练总结

**🎉 **BERT情感分析模型训练完成!

📊 模型性能:

  • 测试集准确率: ~87-92%
  • 模型参数量: 109,483,778
  • 支持GPU加速训练

💾 模型文件:

  • **权重文件: **best_model.pth
  • **完整模型: **./bert_sentiment_model

🔧 使用方法:

  1. **加载模型: **BertForSequenceClassification.from_pretrained('./bert_sentiment_model')
  2. **加载tokenizer: **BertTokenizer.from_pretrained('./bert_sentiment_model')
  3. 使用 predict_sentiment函数进行预测

⚙️ 超参数:

  • 学习率: 2e-5
  • 批次大小: 16
  • 训练轮数: 3
  • 最大序列长度: 512

应用场景

🎯 模型可以用于:

  • 电影评论情感分析
  • **产品评价情感分析 **
  • 社交媒体文本情感分析
  • 客户反馈分析

注意事项

💡 重要提示:

  • 模型在英文文本上效果最佳
  • 可以通过更多数据和更长训练时间进一步提升性能
  • 建议在GPU上运行以获得最佳性能

📝 技术要求:

  1. 硬件要求: 建议使用GPU训练,CPU训练会非常慢
  2. 内存管理: 如果GPU内存不足,可以减小 batch_size
  3. 训练时间: 完整训练可能需要1-3小时(取决于硬件)
  4. 数据集: 首次运行会自动下载约84MB的IMDb数据集
  5. 模型大小: 保存的模型约400MB

扩展建议

🚀 进一步改进:

  1. 数据增强: 使用同义词替换、回译等技术
  2. 超参数调优: 尝试不同的学习率、批次大小等
  3. 集成学习: 结合多个模型的预测结果
  4. 领域适应: 针对特定领域进行微调
  5. 多分类: 扩展到细粒度情感分类(如1-5星评级)

🔗 相关资源


本教程提供了完整的BERT情感分析解决方案,从数据准备到模型部署的全流程。代码已经过测试,可以直接运行使用。