使用mpi并行技术实现wordcount算法

news2024/11/27 4:00:00

【问题描述】

编写程序统计一个英文文本文件中每个单词的出现次数(词频统计),并将统计结果按单词字典序输出到屏幕上。

注:在此单词为仅由字母组成的字符序列。包含大写字母的单词应将大写字母转换为小写字母后统计。

【输入形式】

打开当前目录下文件article.txt;,从中读取英文单词进行词频统计。

【输出形式】

程序将单词统计结果按单词字典序输出到屏幕上,每行输出一个单词及其出现次数,单词和其出现次数间由一个空格分隔,出现次数后无空格,直接为回车。

【样例输入】

当前目录下文件article.txt内容如下:

Do not take to heart every thing you hear.

Do not spend all that you have.

Do not sleep as long as you want;

【样例输出】

all 1

as 2

do 3

every 1

have 1

hear 1

heart 1

long 1

not 3

sleep 1

spend 1

take 1

that 1

thing 1

to 1

want 1

you 3

【样例说明】

输出单词及其出现次数。

数据集下载:wordcount数据集

提取码:k3v2


代码实现:

#include <math.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>

#include <string>
#include <cstring>
#include <fstream>
#include <sstream>
#include <iterator>
#include <vector>
#include <map>
#include <unordered_map>
#include <dirent.h>
#include<algorithm>
#include <iostream>
#include <mpi.h>

using namespace std;

void getFiles(string path, vector<string>& filenames)
{
    DIR *pDir;
    struct dirent* ptr;
    if(!(pDir = opendir(path.c_str()))){
        return;
    }
    while((ptr = readdir(pDir))!=0) {
        if (strcmp(ptr->d_name, ".") != 0 && strcmp(ptr->d_name, "..") != 0){
            filenames.push_back(path + "/" + ptr->d_name);
    }
    }
    closedir(pDir);
}

std::string readFile(std::string filename) {
    std::ifstream in(filename);

    in.seekg(0, std::ios::end);
    size_t len = in.tellg();
    in.seekg(0);
    std::string contents(len + 1, '\0');
    in.read(&contents[0], len);
    return contents;
}

std::vector<std::string> split(std::string const &input) { 
    vector<string> ret;
    int i=0,j=0,n=input.length();
    string temp;
    while(j<n){
        if ((input[j] >= 'a' && input[j] <= 'z') || (input[j] >= 'A' && input[j] <= 'Z')) {
            j++;
        }
        else {
            if (i < j) {
                temp = input.substr(i, j - i);
                ret.emplace_back(temp);
            }
            j++;
            i = j;
       }
   }
   return ret;
}

std::vector<std::string> getWords(
    std::string &content, int rank, int worldsize) {
    std::vector<std::string> wordList = split(content);
    std::vector<std::string> re;
    std::string tmp;
    for (int i = 0 ; i < wordList.size(); i++) {
        if (i % worldsize == rank) {
            //re.push_back(wordList[i]);
            tmp += " " + wordList[i];
        }
    }
    re.push_back(tmp);
    return re;
}

std::vector<pair<std::string, int>> countWords(
    std::vector<std::string> &contentList) {
    // split words
    std::vector<std::string> wordList;
    std::string concat_content;
    for (auto it = contentList.begin(); it != contentList.end(); it++) {
        std::string content = (*it);
        concat_content += " " + content;
    }
    wordList = split(concat_content);

    // do the word count
    std::map<std::string, int> counts;
    for (auto it = wordList.begin(); it != wordList.end(); it++) {
        if (counts.find(*it) != counts.end()) {
            counts[*it] += 1;
        } else {
            counts[*it] = 1;
        }
    }
    std::vector<pair<std::string, int>> res;
    for (auto it = counts.begin(); it != counts.end(); it++) {
        res.push_back(std::make_pair(it->first, it->second));
    }
    return res;
}

std::vector<pair<std::string, int>> mergeCounts(
    std::vector<pair<std::string, int>> &countListA,
    std::vector<pair<std::string, int>> &countListB) {
    std::map<std::string, int> counts;
    for (auto it = countListA.begin(); it != countListA.end(); it++) {
        counts[it->first] = it->second;
    }
    for (auto it = countListB.begin(); it != countListB.end(); it++) {
        if (counts.find(it->first) == counts.end())
            counts[it->first] = it->second;
        else
            counts[it->first] += it->second;
    }
    std::vector<pair<std::string, int>> res;
    for (auto it = counts.begin(); it != counts.end(); it++) {
        res.push_back(std::make_pair(it->first, it->second));
    }
    return res;
}

void sendLocalCounts(int from, int to,
                     std::vector<pair<std::string, int>> &counts) {
    int num = counts.size();
    MPI_Send(&num, 1, MPI_INT, to, from, MPI_COMM_WORLD);

    if (num) {
        int *counts_array = new int[num];
        int i = 0;
        for (auto it = counts.begin(); it != counts.end(); it++, i++) {
            counts_array[i] = it->second;
        }
        MPI_Send(counts_array, num, MPI_INT, to, from, MPI_COMM_WORLD);
        delete counts_array;
    }

    std::string words = " ";
    for (auto it = counts.begin(); it != counts.end(); it++) {
        words += it->first;
        words += " ";
    }
    num = words.length();
    MPI_Send(&num, 1, MPI_INT, to, from, MPI_COMM_WORLD);
    if (num) {
        char *_words = new char[num];
        words.copy(_words, num);
        MPI_Send(_words, num, MPI_CHAR, to, from, MPI_COMM_WORLD);
        delete _words;
    }
}

void recvCounts(int from, int to, std::vector<pair<std::string, int>> &counts) {
    MPI_Status status;
    int _num = 0, num = 0;
    int *counts_array;
    char *_words;
    std::string words;
    MPI_Recv(&_num, 1, MPI_INT, from, from, MPI_COMM_WORLD, &status);
    if (_num) {
        counts_array = new int[_num];
        MPI_Recv(counts_array, _num, MPI_INT, from, from, MPI_COMM_WORLD, &status);
    }

    MPI_Recv(&num, 1, MPI_INT, from, from, MPI_COMM_WORLD, &status);
    if (num) {
        _words = new char[num];
        MPI_Recv(_words, num, MPI_CHAR, from, from, MPI_COMM_WORLD, &status);
        
        for (int _i = 0; _i < num;  _i++) words+=_words[_i];
        delete _words;
    }

    if (_num) {
        std::vector<std::string> word_vec = split(words);
        for (int i = 0; i < _num; i++) {
            counts.push_back(std::make_pair(word_vec[i], counts_array[i]));
        }
        delete counts_array;
    }
}

void treeMerge(int id, int worldSize,
               std::vector<pair<std::string, int>> &counts) {
    int participants = worldSize;
    while (participants > 1) {
        MPI_Barrier(MPI_COMM_WORLD);
        int _participants = participants / 2 + (participants % 2 ? 1 : 0);
        if (id < _participants) {
            if (id + _participants < participants) {
                std::vector<pair<std::string, int>> _counts;
                std::vector<pair<std::string, int>> temp;
                recvCounts(id + _participants, id, _counts);
                temp = mergeCounts(_counts, counts);
                counts = temp;
            }
        }
        if (id >= _participants && id < participants) {
            sendLocalCounts(id, id - _participants, counts);
        }
        participants = _participants;
    }
}

int main(int argc, char *argv[]) {
    int rank;
    int worldSize;
    MPI_Init(&argc, &argv);

    MPI_Comm_size(MPI_COMM_WORLD, &worldSize);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

    /*
    * Word Count for big file
    */
{
    struct timeval start, stop;
    gettimeofday(&start, NULL);
    std::string big_file = "input.txt";
    auto content = readFile(big_file);
    transform(content.begin(),content.end(),content.begin(), ::tolower);
    auto partContent = getWords(content, rank, worldSize);

    auto counts = countWords(partContent);

    treeMerge(rank, worldSize, counts);
    gettimeofday(&stop, NULL);
    
    
    if (rank == 0) {
        fstream dataFile;
        std::string out_file = "./output.txt";
        dataFile.open("./output.txt", ios::out);
        int num = 0;
        for (auto it = counts.begin(); it != counts.end(); it++) {
            //num = it->second/worldSize;
            num = it->second;
            dataFile << it->first << " : " << num << endl;
            cout<< it->first << " : " << num << endl;

        }
    }
    if (rank == 0) {
        cout << "times: "
             << (stop.tv_sec - start.tv_sec) * 1000.0 +
                    (stop.tv_usec - start.tv_usec) / 1000.0
             << " ms"<< endl;
    }
   
}

    MPI_Finalize();
    return 0;
}

编译:mpicxx ./filename.cpp  -o ./filename

运行:mpirun -n 2 ./filename

运行结果:

Mpi:一个线程时:

四个线程时:

加速比:8668.63/3854.48=2.254163985803533

加速效果明显。

Mpi基本原理:

  1.什么是MPI

Massage Passing Interface:是消息传递函数库的标准规范,由MPI论坛开发。

一种新的库描述,不是一种语言。共有上百个函数调用接口,提供与C和Fortran语言的绑定

MPI是一种标准或规范的代表,而不是特指某一个对它的具体实现

MPI是一种消息传递编程模型,并成为这种编程模型的代表和事实上的标准

2.MPI的特点

MPI有以下的特点:

消息传递式并行程序设计

指用户必须通过显式地发送和接收消息来实现处理机间的数据交换。

在这种并行编程中,每个并行进程均有自己独立的地址空间,相互之间访问不能直接进行,必须通过显式的消息传递来实现。

这种编程方式是大规模并行处理机(MPP)和机群(Cluster)采用的主要编程方式。

并行计算粒度大,特别适合于大规模可扩展并行算法

用户决定问题分解策略、进程间的数据交换策略,在挖掘潜在并行性方面更主动,并行计算粒度大,特别适合于大规模可扩展并行算法

消息传递是当前并行计算领域的一个非常重要的并行程序设计方式

二、MPI的基本函数

MPI调用借口的总数虽然庞大,但根据实际编写MPI的经验,常用的MPI函数是以下6个:

MPI_Init(…);

MPI_Comm_size(…);

MPI_Comm_rank(…);

MPI_Send(…);

MPI_Recv(…);

MPI_Finalize();

三、MPI的通信机制

MPI是一种基于消息传递的编程模型,不同进程间通过消息交换数据。

1.MPI点对点通信类型

所谓点对点的通信就是一个进程跟另一个进程的通信,而下面的聚合通信就是一个进程和多个进程的通信。

  1. 标准模式:

该模式下MPI有可能先缓冲该消息,也可能直接发送,可理解为直接送信或通过邮局送信。是最常用的发送方式。

由MPI决定是否缓冲消息

没有足够的系统缓冲区时或出于性能的考虑,MPI可能进行直接拷贝:仅当相应的接收完成后,发送语句才能返回。

这里的系统缓冲区是指由MPI系统管理的缓冲区。而非进程管理的缓冲区。

MPI环境定义有三种缓冲区:应用缓冲区、系统缓冲区、用户向系统注册的通信用缓冲区

MPI缓冲消息:发送语句在相应的接收语句完成前返回。

这时后发送的结束或称发送的完成== 消息已从发送方发出,而不是滞留在发送方的系统缓冲区中。

该模式发送操作的成功与否依赖于接收操作,我们称之为非本地的,即发送操作的成功与否跟本地没关系。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/665251.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

iPhone手机用户们在用的手机桌面便签推荐哪款?

iPhone手机的性能和外观设计是非常好的&#xff0c;很多人在工作和生活中都少不了它的辅助。有人在工作生活中担心会忘掉一些重要的事&#xff0c;在这种情况下可以用便签软件来帮自己把这些重要的事情记录下来。iPhone手机用户们在用的手机桌面便签推荐哪款&#xff1f; 其实…

加密与解密 调试篇 静态分析技术 (一)文件类型/窗口/定位

1.文件类型分析 逆向分析的第一步就是文件类型分析 文件使用什么写的 使用什么编译器编译的 是否被加密过 然后才能进入下一步 有很多工具可以进行分析 我选择exeinfo来查看 但是并不是工具就可以直接分析完成 因为有些会存在欺骗 把入口代码改造成和Visual C 6.0类似的…

04-闭包

闭包&#xff1a;函数嵌套函数&#xff0c;内部函数就是闭包&#xff0c;只有函数内部的子函数才能读取内部变量。 先上一个经典的闭包&#xff1a; function outerFun () {let a 10;function innerFun () {console.log(a);}return innerFun; } let fun outerFun(); fun();…

GPT提示词系统学习-第三课-规范化提示让样本走在提示词前

开篇 本教程将为您提供有关不同类型提示的术语及如何描述它们。尽管提示工程中已经有一些方法来形式化术语,但这个领域仍在不断发展,我们在这篇教程中将给到大家展示一种基于QA形式的通用的、标准的提示语写法。 提示的组成部分 以下是在一个提示中经常会出现的一些组成部…

Triton教程 --- 动态批处理

Triton教程 — 动态批处理 Triton 提供了动态批处理功能&#xff0c;将多个请求组合在一起执行同一模型以提供更大的吞吐量。 默认情况下&#xff0c;只有当每个输入在请求中具有相同的形状时&#xff0c;请求才能被动态批处理。 为了在输入形状经常变化的情况下利用动态批处理…

【PCB专题】Allegro 生成钻孔数据方法

生成Drill Symbol 选择Manufacture->NC->Drill Customization... 在Drill Customization中选择Auto generate symbols(防止钻孔未定义图形符号,选择后会自动产生图形符号)。然后在弹出的警告框中选择是(Y)。最后点击OK。 生成Drill Legend 选择Manufacture->NC-&…

什么是OTN——光传送网?

概要 在现代通信网络中&#xff0c;光纤技术已经成为主流&#xff0c;提供了高速、高带宽的数据传输能力。光传送网&#xff08;Optical Transport Network&#xff0c;OTN&#xff09;是一种基于光纤技术的传输网络&#xff0c;用于实现可靠、高效的光纤通信。本文将详细介绍O…

01 React入门、虚拟DOM

总结 一、React 入门 1.1 特点 高性能、声明式、组件化、单向响应的数据流、JSX扩展、灵活 1.2 React初体验 <head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"><meta name"viewpo…

AIDA64压力测试教程,AIDA64压力测试多长时间,AIDA64压力测试结果怎么看

硬件管理工具AIDA64功能全面专业&#xff0c;操作简便易学&#xff0c;使用AIDA64进行系统稳定性测试逐渐成为更多用户的选择。可能有很多用户对如何使用AIDA64进行系统稳定性测试不太了解&#xff0c;系统稳定性测试也称为压力测试&#xff0c;或者更通俗的烤机测试&#xff0…

SPI协议详细总结附实例图文讲解通信过程(快速掌握)

目录 一、简介二、数据通信过程2.1 通信总过程总结2.2 具体协议规则2.2.1 时钟极性与时钟相位2.2.2 SPI模式2.2.3 图文实例讲解 2.3 SPI协议优缺点总结 三、其他相关链接 一、简介 SPI(Serial Peripheral nterface&#xff0c;串行外设接口) 协议是一种高速高效率、全双工的通…

数据库SQL Server实验报告 之 SQL数据库的安全性(7/8)

实验名称 数据库的安全性实验 注意&#xff1a;原版word在下载资源里面&#xff08;免费下载&#xff09; 实验目的及要求&#xff1a; 使学生加深对数据库安全性和完整性的理解。掌握SQL Server中有关用户、角色及操作权限…

【Android复习笔记】ARouter / Navigation / EventBus

注:本文主要基于过去 Android View 体系的路由学习笔记整理,不包括最新的 Jetpack Compose 路由体系,如您需了解关于 Jetpack Compose 中的导航路由,请参考 Jetpack Compose 中的导航路由 一文。 传统路由方式 // 显性意图 startActivity(new Intent(this, HomeActivity.c…

JavaScript 的性能分析与提升

JavaScript 的性能分析与提升 对于 JavaScript/前端来说&#xff0c;性能的提升主要有两大方面&#xff1a; 页面初始化的优化 这一方面主要涉及到非代码结构上&#xff0c;但是能够提升用户体验感的优化&#xff0c;如&#xff0c;提升用户看到页面的速度、减少用户等待与页面…

一、枚举类型——使用接口来组织枚举

枚举类型无法被继承&#xff0c;这一点可能有时会让人沮丧。想要继承枚举的动机&#xff0c;一部分源自希望扩充原始枚举中的元素&#xff0c;另一部分源自想要使用子类型来创建不同的子分组。 你可以在一个接口内对元素进行分组&#xff0c;然后基于这个接口生成一个枚举&…

Python零基础入门(三)——基本输入与输出

系列文章目录 个人简介&#xff1a;机电专业在读研究生&#xff0c;CSDN内容合伙人&#xff0c;博主个人首页 Python入门专栏&#xff1a;《Python入门》欢迎阅读&#xff0c;一起进步&#xff01;&#x1f31f;&#x1f31f;&#x1f31f; 码字不易&#xff0c;如果觉得文章不…

关于 Vue3 响应式 API 以及 reactive 和 ref 的用法

文章目录 &#x1f4cb;前言&#x1f3af;关于响应式&#x1f3af;reactive 的用法&#x1f3af;ref 的用法&#x1f4dd;最后 &#x1f4cb;前言 这篇文章记录一下 Vue3 响应式的内容&#xff0c;其中还包括了 reactive 和 ref 的用法。响应式是一种允许以声明式的方式去适应…

VMware16虚拟机安装Ubuntu16.04 LTS

VMware14虚拟机安装Ubuntu16.04 LTS 一、基本介绍二、vmware下安装ubuntu系统2.1 下载ubuntu客户端镜像2.2 安装及配置2.2.1 安装2.2.2 配置 三、ubuntu系统使用 回到目录   回到末尾 一、基本介绍 对于ubuntu而言&#xff0c;就是linux操作系统的具体&#xff0c;而linux对…

S7-1200通过外部端子控制V20变频器启停+MODBUS读写频率的具体方法

S7-1200通过外部端子控制启停+MODBUS读写频率的具体方法 本例中是通过S7-1200PLC外部端子的方式控制变频器启停,用Mobus RTU通讯读写变频器频率。 硬件连接: 屏蔽双绞线将V20变频器P+,N-连接到CPU上CB1241 T/RA 和T/RB, T/RB接P+,T/RA接N-。TA和T/RA用短线连上,TB和T/RB用短…

jmeter函数助手

详解JMeter函数和变量 测试人员可以在JMeter的选项菜单中找到函数助手对话框&#xff08;"Function Helper"对话框&#xff09;&#xff0c;如图11-1所示。 图11-1 函数助手&#xff08;Function Helper&#xff09;对话框 使用函数助手&#xff0c;测试人员可以…

【Python】文件操作 ② ( 文件操作 | 读取文件 | read 函数 | readline 函数 | readlines 函数 )

文章目录 一、读取文件1、read 函数2、readline 函数3、readlines 函数 二、代码示例 - 读取文件1、代码示例 - read 函数读取文件 10 字节内容2、代码示例 - read 函数读取文件所有内容3、代码示例 - readline 函数读取文件一行内容4、代码示例 - readlines 函数读取文件所有内…