使用C语言中的fork和pipe函数来计算具有多个进程的文件中的行,字符和单词

问题描述

我正在尝试使用带有fork和pipe函数的多个进程来修改一个程序,该程序读取文件并返回行,单词和字符的总数。该程序可以编译并正常运行,但是当前仅当用户为子进程数输入1时,输出才正确。对于每隔一个正整数,程序将返回正确的值乘以进程数。 (例如:如果文件中的行数为4,而用户输入的进程数为2,则返回值为8)有谁知道我可以解决这个问题,以便每个进程将读取文件的工作而不是每次读取的工作分摊通过整个事情?用户在运行程序时输入文件名和所需的子进程数。我目前不关心效率,只是输出正确。这是我的代码

//wc.h
#ifndef WC_H
#define WC_H

#include <stdio.h>

typedef struct count_t {
        int linecount;
        int wordcount;
        int charcount;
} count_t;

count_t word_count(FILE* fp,long offset,long size);
extern int crashRate;

#endif



//wc_mul.c
#include "wc.h"
#include <stdio.h>
#include <stdlib.h>
#include <sys/wait.h>

#define MAX_PROC 100
#define MAX_FORK 100

int crashRate = 0;

count_t word_count(FILE* fp,long size)
{
        char ch;
        long rbytes = 0;

        count_t count;
        // Initialize counter variables
        count.linecount = 0;
        count.wordcount = 0;
        count.charcount = 0;
        
        printf("[pid %d] reading %ld bytes from offset %ld\n",getpid(),size,offset);

        if(fseek(fp,offset,SEEK_SET) < 0) {
                printf("[pid %d] fseek error!\n",getpid());
        }

        while ((ch=getc(fp)) != EOF && rbytes < size) {
                // Increment character count if NOT new line or space
                if (ch != ' ' && ch != '\n') { ++count.charcount; }

                // Increment word count if new line or space character
                if (ch == ' ' || ch == '\n') { ++count.wordcount; }

                // Increment line count if new line character
                if (ch == '\n') { ++count.linecount; }
                rbytes++;
        }

        srand(getpid());
        if(crashRate > 0 && (rand()%100 < crashRate)) 
        {
                printf("[pid %d] crashed.\n",getpid());
                abort();
        }

        return count;
}


int main(int argc,char **argv)
{
                long fsize;
                FILE *fp;
                int numJobs;
        //plist_t plist[MAX_PROC];
                count_t total,count,buf;
                int i,j,pid,status,p[2];
                int nFork = 0;

        if(argc < 3) {
                printf("usage: wc <# of processes> <filname>\n");
                return 0;
        }
        
        if(argc > 3) {
                crashRate = atoi(argv[3]);
                if(crashRate < 0) crashRate = 0;
                if(crashRate > 50) crashRate = 50;
        }
        printf("crashRate RATE: %d\n",crashRate);


        numJobs = atoi(argv[1]);
        if(numJobs > MAX_PROC) numJobs = MAX_PROC;

        total.linecount = 0;
        total.wordcount = 0;
        total.charcount = 0;

        // Open file in read-only mode
        fp = fopen(argv[2],"r");

        if(fp == NULL) {
                printf("File open error: %s\n",argv[2]);
                printf("usage: wc <# of processes> <filname>\n");
                return 0;
        }

        fseek(fp,0L,SEEK_END);
        fsize = ftell(fp);
        
        fclose(fp);
        // calculate file offset and size to read for each child
        
        for(i = 0; i < numJobs; i++) {
                if(nFork++ > MAX_FORK) return 0;
                if (pipe(p) != 0)
                  exit(1);
                pid = fork();
                if(pid < 0) {
                        printf("Fork Failed.\n");
                } else if(pid == 0) {
                        // Child
                        fp = fopen(argv[2],"r");           
                        count = word_count(fp,fsize);
                        write(p[1],&count,sizeof(count));
                        close(p[0]);
                        close(p[1]);
                        // send the result to the parent through the message queue
                        fclose(fp);
                        return 0;
                }
        }
        waitpid(pid,&status,0);
        close(p[1]);
        for (j=0; j < numJobs; j++) {
          read(p[0],&buf,sizeof(count));
          total.linecount += buf.linecount;
          total.wordcount += buf.wordcount;
          total.charcount += buf.charcount;
        }
        // Parent
        // wait for all children
        // check their exit status
        // read the result from normalliy terminated child
        // re-crete new child if there is one or more Failed child
        
        printf("\n========== Final Results ================\n");
        printf("Total Lines : %d \n",total.linecount);
        printf("Total Words : %d \n",total.wordcount);
        printf("Total Characters : %d \n",total.charcount);
        printf("=========================================\n");
        return(0);
}

解决方法

  1. 必须{strong>在之前打开pipe。否则,它在父级中不可用。

  2. 指针传递给count而不是count本身,传递给readwrite

write(p[1],&count,sizeof(count)); .. read(p[0],&buf,sizeof(count));

  1. 每个孩子都需要在write之后通过close(p[1])关闭管道的写入端。现在,您只需要关闭读取端即可。

PS:建议对readwrite添加通常的结果检查。


如果您将单个pipe用于多个进程,则第一个孩子关闭pipe后,它将关闭并且无法再读取。

您需要一组管道,每个进程一个:

int p[numJobs][2];

以下代码对我有用。我添加了一些printf以便更好地理解。

int main(int argc,char **argv)
{
    long fsize;
    FILE *fp;
    int numJobs;

    count_t total,count,buf;
    int i,j,pid,status;

    if(argc < 3) {
            printf("usage: wc <# of processes> <filname>\n");
            return 0;
    }
        
    if(argc > 3) {
            crashRate = atoi(argv[3]);
            if(crashRate < 0) crashRate = 0;
            if(crashRate > 50) crashRate = 50;
    }
    printf("crashRate RATE: %d\n",crashRate);


    numJobs = atoi(argv[1]);
    if(numJobs > MAX_PROC) numJobs = MAX_PROC;

    int p[numJobs][2];
    
    total.linecount = 0;
    total.wordcount = 0;
    total.charcount = 0;

    // Open file in read-only mode
    fp = fopen(argv[2],"r");

    if(fp == NULL) {
            printf("File open error: %s\n",argv[2]);
            printf("usage: wc <# of processes> <filname>\n");
            return 0;
    }

    fseek(fp,0L,SEEK_END);
    fsize = ftell(fp);
    
    fclose(fp);
        // calculate file offset and size to read for each child
        
    for(i = 0; i < numJobs; i++) {
        
        if (pipe(p[i]) != 0) exit(1);

        pid = fork();
        
        if(pid < 0) {
            printf("Fork failed.\n");
            exit(1);
        } else if(pid == 0) {
            
            // Child
            fp = fopen(argv[2],"r");
            count = word_count(fp,fsize);
            fclose(fp);

            close(p[i][0]);

            // send the result to the parent through the message queue
            long bytes_sent;
            
            if ( (bytes_sent = write(p[i][1],sizeof(count)) ) ==-1) {
                printf("Writing into pipe failed.\n");
                exit(1);
            };
            printf("Child  process %d sent %ld bytes (%'d lines,%'d words,%'d chars) \n",getpid(),bytes_sent,count.linecount,count.wordcount,count.charcount);
            
            close(p[i][1]);
            _exit(0);
            
        }
    }
    
    
    // wait for all child processes to close
    while(wait(NULL) != -1){};

    long bytes_read;
    
    for (j=0; j < numJobs; j++) {
                
        if ((bytes_read = read(p[j][0],sizeof(buf)) ) ==-1) {
            printf("Reading from pipe failed.\n");
            exit(1);
        };

        if (bytes_read){
            
            printf("Parent process %d read %ld bytes (%'d lines,bytes_read,buf.linecount,buf.wordcount,buf.charcount);

            total.linecount += buf.linecount;
            total.wordcount += buf.wordcount;
            total.charcount += buf.charcount;
        }
                    
        close(p[j][0]);
        close(p[j][1]);
    }


    // Parent
        // wait for all children
        // check their exit status
        // read the result from normalliy terminated child
        // re-create new child if there is one or more failed child
        
        printf("\n========== Final Results ================\n");
        printf("Total Lines : %d \n",total.linecount);
        printf("Total Words : %d \n",total.wordcount);
        printf("Total Characters : %d \n",total.charcount);
        printf("=========================================\n");
        return(0);
}