from threading import Thread
import os
class InputData(object):
def read(self):
raise NotImplementedError
class PathInputData(InputData):
def __init__(self, path):
super().__init__()
self.path = path
def read(self):
return open(self.path, encoding='utf8').read()
class Worker(object):
def __init__(self, input_data):
self.input_data = input_data
self.result = 0
def map(self):
raise NotImplementedError
def reduce(self, other):
raise NotImplementedError
class LineCountWorker(Worker):
def map(self):
data = self.input_data.read()
self.result = data.count('\n')
def reduce(self, other):
self.result += other.result
def generate_inputs(data_dir):
for name in os.listdir(data_dir):
yield PathInputData(os.path.join(data_dir, name))
def create_workers(input_list):
workers = []
print('input_list', input_list)
for input_data in input_list:
workers.append(LineCountWorker(input_data))
return workers
def execute(workers):
threads = [Thread(target=w.map) for w in workers]
for thread in threads: thread.start()
for thread in threads: thread.join()
first, rest = workers[0], workers[1:]
for worker in rest:
first.reduce(worker)
return first.result
def mapreduce(data_dir):
inputs = generate_inputs(data_dir)
workers = create_workers(inputs)
return execute(workers)
path = "D:\\python3.64\\CorePython\\chapter-6"
result = mapreduce(path)
print('There are', result, 'lines')
输出:
input_list <generator object generate_inputs at 0x00000278722C7BF8>
There are 83 lines
升级版:
from threading import Thread
import os
class GenericInputData(object):
def read(self):
raise NotImplementedError
@classmethod
def generate_inputs(cls, config):
raise NotImplementedError
class PathInputData(GenericInputData):
def __init__(self, path):
super().__init__()
self.path = path
def read(self):
return open(self.path, encoding='utf-8').read()
@classmethod
def generate_inputs(cls, config):
data_dir = config['data_dir']
for name in os.listdir(data_dir):
yield cls(os.path.join(data_dir, name))
class GenericWorker(object):
def __init__(self, input_data):
self.input_data = input_data
self.result = 0
def map(self):
raise NotImplementedError
def reduce(self, other):
raise NotImplementedError
@classmethod
def create_workers(cls, input_class, config):
workers = []
for input_data in input_class.generate_inputs(config):
workers.append(cls(input_data))
return workers
class LineCountWorker(GenericWorker):
def map(self):
data = self.input_data.read()
self.result = data.count('\n')
def reduce(self, other):
self.result += other.result
def execute(workers):
threads = [Thread(target=w.map) for w in workers]
for thread in threads: thread.start()
for thread in threads: thread.join()
first, rest = workers[0], workers[1:]
for worker in rest:
first.reduce(worker)
return first.result
# 最后,重写mapreduce函数,令其变得完全通用.
def mapreduce(worker_class, input_class, config):
workers = worker_class.create_workers(input_class, config)
return execute(workers)
# 在测试文件上面执行修改后的MapReduce程序,所得结果与原来那套实现方式是相同的
# .区别在于,现在的mapreduce函数需要更多的参数,以便用更加通用的方式来梁作相关对象.
path = "D:\\python3.64\\CorePython\\chapter-6"
config = {'data_dir': path}
result = mapreduce(LineCountWorker, PathInputData, config)
print('There are', result, 'lines')