纯文本
整个文本只有一行,无换行,字之间空格隔开
![]()
方法一:torchtext
任务:构造语言模型数据集,返回的单个数据类型如下,target为inputs的偏移。
inputs:[A B C D E F]
target:[B C D E F G]
为此我们会使用LanguageModelingDataset建立数据集,然后使用BPTTIterator创建迭代器。
注意:如果文本数过小,且BPTTIterator中设置的batch_size * bptt_len大于文本总长度,则生成的batch的seq_len达不到bptt_len。
如果处理中文,tokenize函数可以使用jieba进行分词:
tokenize = lambda x: jieba.lcut(x)
import torchtext
import torch
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
BATCH_SIZE = 32
MAX_VOCAB_SIZE = 50000
tokenize = lambda x: x.split()
"""
定义TEXT field用于处理文本的方法
sequential: Whether the datatype represents sequential data. If False, no tokenization is applied. Default: True.
use_vocab: Whether to use a Vocab object. If False, the data in this field should already be numerical. Default: True.
tokenize: The function used to tokenize strings using this field into sequential examples. Default: string.split.
"""
TEXT = torchtext.data.Field(sequential=True, use_vocab=True, tokenize=tokenize, lower=True,
batch_first=True, init_token=None, eos_token=None)
"""
LanguageModelingDataset.split() 处理纯文本数据,分词方法直接使用str.split()
"""
train, val, test = torchtext.datasets.LanguageModelingDataset.splits(path="data",
train="text8.train.txt",
validation="text8.dev.txt",
test="text8.test.txt",
text_field=TEXT)
# 只有一条数据 result=1 每条数据用一个字典表示
print('total example row = ', len(train))
# 打印第一条数据的keys值 result='result'
print(train[0].__dict__.keys())
# 打印第一条数据的values值 result='result'
# print(train[0].__dict__.values())
# create vocabulary
TEXT.build_vocab(train, max_size=MAX_VOCAB_SIZE)
VOCAB_SIZE = len(TEXT.vocab)
print("vocabulary size: ", VOCAB_SIZE)
print(TEXT.vocab.itos[:10])
print(TEXT.vocab.stoi['apple'])
print('<BOS> indx is ', TEXT.vocab.stoi['<BOS>'])
print('<EOS> indx is ', TEXT.vocab.stoi['<EOS>'])
UNK_STR = TEXT.unk_token
PAD_STR = TEXT.pad_token
UNK_IDX = TEXT.vocab.stoi[UNK_STR]
PAD_IDX = TEXT.vocab.stoi[PAD_STR]
print(f'{UNK_STR} index is {UNK_IDX}')
print(f'{PAD_STR} index is {PAD_IDX}')
"""
Defines an iterator for language modeling tasks that use BPTT.
bptt: Length of sequences for backpropagation through time
repeat: Whether to repeat the iterator for multiple epochs. Default: False.
"""
train_iter, val_iter, test_iter = torchtext.data.BPTTIterator.splits((train, val, test), batch_size=BATCH_SIZE,
device=device, bptt_len=50,
repeat=False, shuffle=True)
for batch in train_iter:
print(batch.text.shape) # (batch=32, seqlen=50)
print(batch.target.shape) # (batch=32, seqlen=50)
print(" ".join(TEXT.vocab.itos[i] for i in batch.text[-1, :].data.cpu()))
print(" ".join(TEXT.vocab.itos[i] for i in batch.target[-1, :].data.cpu()))
break
方法二:torch.utils.data
任务:构造一个word2vec的skip-gram数据, 返回的单个数据类型如下:
[center_word, pos_words, neg_words]
center_word.shape: (batchSize)
pos_words.shape: (batchSize, 2*C)
neg_words.shape: (batchSize, 2*C*K)
可以使用torch.utils.data的Dataset和DataLoader进行数据的处理。
import torch
from torch.utils.data import Dataset, DataLoader
from collections import Counter
import numpy as np
K = 100 # number of negative samples
C = 3 # nearby words threshold
MAX_VOCAB_SIZE = 30000 # the vocabulary size
BATCH_SIZE = 128 # the batch size
train_text = open('data/text8.train.txt', 'r').read()
# val_text = open('data/text8.dev.txt', 'r').read()
# test_text = open('data/text8.test.txt', 'r').read()
text = [word for word in train_text.split()]
vocab = dict(Counter(text).most_common(MAX_VOCAB_SIZE-1))
vocab["<unk>"] = len(text) - np.sum(list(vocab.values()))
idx_to_word = [word for word in vocab.keys()]
word_to_idx = {word: i for i, word in enumerate(idx_to_word)}
"""
统计词典中词出现的频率
"""
# 获取单词出现的个数
word_counts = np.array([count for count in vocab.values()], dtype=np.float32)
# 计算频率
word_freqs = word_counts / np.sum(word_counts)
# 0.75 次幂
word_freqs = word_freqs ** (3./4.)
# 归一化
word_freqs = word_freqs / np.sum(word_freqs) # 用来做 negative sampling
VOCAB_SIZE = len(idx_to_word)
class WordEmbeddingDataset(torch.utils.data.Dataset):
def __init__(self, text, word_to_idx, idx_to_word, word_freqs, word_counts):
super(WordEmbeddingDataset, self).__init__()
# 将单词转换成数字索引
self.text_encoded = [word_to_idx.get(t, VOCAB_SIZE - 1) for t in text]
self.text_encoded = torch.Tensor(self.text_encoded).long()
# dict:word->index
self.word_to_idx = word_to_idx
# list: index->word
self.idx_to_word = idx_to_word
# 单词频率
self.word_freqs = torch.Tensor(word_freqs)
# 单词次数统计
self.word_counts = torch.Tensor(word_counts)
def __len__(self):
return len(self.text_encoded)
def __getitem__(self, idx):
# 中心词
center_word = self.text_encoded[idx]
# 周边词
pos_indices = list(range(idx - C, idx)) + list(range(idx + 1, idx + C + 1))
pos_indices = [i % len(self.text_encoded) for i in pos_indices]
# 正采样
pos_words = self.text_encoded[pos_indices]
# 负采样
neg_words = torch.multinomial(self.word_freqs, K * pos_words.shape[0], True)
return center_word, pos_words, neg_words
dataset = WordEmbeddingDataset(text, word_to_idx, idx_to_word, word_freqs, word_counts)
dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True)