You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

530 lines
17 KiB

/*
* pvm_functions.c
*
* Copyright (C) Marzio Malanchini - July 2003
*
* This file is part of transcode, a video stream processing tool
*
* transcode is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2, or (at your option)
* any later version.
*
* transcode is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with GNU Make; see the file COPYING. If not, write to
* the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
*
*/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include <stdlib.h>
#include <limits.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <pvm_functions.h>
#define MAX_BUF 1024
int f_pvm_start_single_process(char *p_spawn_process,char **p_argv,char *p_where)
{
int s_slave_tid;
if (pvm_spawn(p_spawn_process,p_argv,PvmTaskHost,p_where,1,&s_slave_tid)<0) /* start up merger */
{
pvm_perror("");
return(-1);
}
return(s_slave_tid);
}
void f_pvm_stop_single_process(int s_slave_tid)
{
if (s_slave_tid >=0)
pvm_kill(s_slave_tid);
}
static int f_pvm_send_all(int s_buff_size,char *p_buffer,int s_option,pvm_func_t *p_func,int s_type,int s_pos_tids)
{
static int s_seq=-1;
if (s_type!=-1)
{
s_seq=s_type-1;
return(0);
}
if (s_pos_tids>p_func->s_nproc)
{
return(-1);
}
if (p_func->p_slave_tids ==NULL)
return(-1);
if (s_seq+1==PVM_MSG_LAST_SEQ)
s_seq=0;
else
s_seq++;
pvm_initsend(PvmDataDefault);
pvm_pkint(&s_seq,1,1);
pvm_pkint(&s_option,1,1);
pvm_pkint(&s_buff_size,1,1);
pvm_pkbyte(p_buffer,s_buff_size,1);
pvm_send(p_func->p_slave_tids[s_pos_tids],PVM_MSG_WORK);
return(s_seq);
}
int f_pvm_set_send(int s_set_seq)
{
return(f_pvm_send_all(0,NULL,0,NULL,s_set_seq,0));
}
int f_pvm_send(int s_buff_size,char *p_buffer,int s_option,int s_pos_tids,pvm_func_t *p_func)
{
return(f_pvm_send_all(s_buff_size,p_buffer,s_option,p_func,-1,s_pos_tids));
}
static int f_pvm_multi_send_all(int s_buff_size,char *p_buffer,int s_option,pvm_func_t *p_func,int s_wait)
{
static int s_seq=-1;
int s_rec_seq,s_rc;
if (p_func->p_slave_tids ==NULL)
return(-1);
pvm_initsend(PvmDataDefault);
if (!s_wait)
{
s_rec_seq=-1;
pvm_pkint(&s_rec_seq,1,1); /*no seq number */
}
else
{
s_seq++;
pvm_pkint(&s_seq,1,1);
}
pvm_pkint(&s_option,1,1);
pvm_pkint(&s_buff_size,1,1);
pvm_pkbyte(p_buffer,s_buff_size,1);
pvm_mcast(p_func->p_slave_tids,p_func->s_nproc,PVM_MSG_CONF);
if (s_wait)
{
for(;;)
{
pvm_recv(-1,PVM_MSG_RING); /*waiting the close of the token*/
pvm_upkint(&s_rec_seq,1,1); /*send the response to number*/
pvm_upkint(&s_rc,1,1); /*retrive the rc*/
if (s_rec_seq==s_seq) /*if the seq number match return ok*/
return(s_rc);
}
}
else
{
return(0);
}
}
int f_pvm_multi_send_nw(int s_buff_size,char *p_buffer,int s_option,pvm_func_t *p_func)
{
return(f_pvm_multi_send_all(s_buff_size,p_buffer,s_option,p_func,0));
}
int f_pvm_multi_send(int s_buff_size,char *p_buffer,int s_option,pvm_func_t *p_func)
{
return(f_pvm_multi_send_all(s_buff_size,p_buffer,s_option,p_func,1));
}
static int f_pvm_nrecv_check(int *p_buff_size,char *p_buffer,int s_init_seq,int *s_rc)
{
static int s_seq_rec=0;
int s_tmp,s_bs,s_rc_int;
if (s_init_seq != -1)
{
s_seq_rec=s_init_seq; /*set the inital seq number*/
*s_rc=0;
return(0);
}
if(pvm_nrecv(-1,s_seq_rec)!=0)
{
pvm_upkint(&s_tmp,1,1); /*decode the seq number of the message*/
pvm_upkint(&s_rc_int,1,1); /*decode the function rc */
*s_rc=s_rc_int; /*rc*/
pvm_upkint(&s_bs,1,1); /*decode the size of the next message*/
if (s_bs!=0)
pvm_upkbyte(p_buffer,s_bs,1); /*decode the received buffer*/
*p_buff_size=s_bs;
s_seq_rec++; /*this is the response of my request*/
return(s_tmp); /*return the seq number of the recv message*/
}
*s_rc=0;
return(-1);
}
int f_pvm_nrecv(int *p_buff_size,char *p_buffer,int *s_rc)
{
return(f_pvm_nrecv_check(p_buff_size,p_buffer,-1,s_rc));
}
int f_pvm_set_nrecv(int s_seq)
{
int s_rc;
return(f_pvm_nrecv_check(NULL,NULL,s_seq,&s_rc));
}
static int f_pvm_recv_check(int *p_buff_size,char *p_buffer,int s_init_seq,int *s_rc)
{
static int s_seq_rec=0;
int s_tmp,s_bs,s_rc_int;
if (s_init_seq != -1)
{
s_seq_rec=s_init_seq; /*set the inital seq number*/
*s_rc=0;
return(0);
}
pvm_recv(-1,s_seq_rec);
pvm_upkint(&s_tmp,1,1); /*decode the seq number of the message*/
pvm_upkint(&s_rc_int,1,1); /*decode the function rc */
*s_rc=s_rc_int; /*rc*/
pvm_upkint(&s_bs,1,1); /*decode the size of the next message*/
if (s_bs!=0)
pvm_upkbyte(p_buffer,s_bs,1); /*decode the received buffer*/
if (s_seq_rec==INT_MAX)
s_seq_rec=0;
else
s_seq_rec++; /*this is the response of my request*/
*p_buff_size=s_bs;
return(s_tmp); /*return the seq number of the recv message*/
}
int f_pvm_recv(int *p_buff_size,char *p_buffer,int *s_rc)
{
return(f_pvm_recv_check(p_buff_size,p_buffer,-1,s_rc));
}
int f_pvm_set_recv(int s_seq)
{
int s_rc;
return(f_pvm_recv_check(NULL,NULL,s_seq,&s_rc));
}
/* This function must be included in the spawn process: it initialize the skeduler */
void f_pvm_skeduler(pvm_res_func_t *(*f_my_elab_func)(int,char *,int,int))
{
int s_father_tid,s_rec_seq,s_src_tid,s_msg_type,s_msg_size,s_bufid;
int s_size=0,s_option,s_rc,s_my_tid,*p_other_tids,s_num_tids,s_cont;
static int s_alloc_size=0,s_nfrxtask=1,s_join=-1;
static char *p_buffer=NULL;
pvm_res_func_t *p_result=NULL;
char s_hostname[MAX_BUF];
static int s_seq_preinit=-1;
s_father_tid=pvm_parent();
s_my_tid=pvm_mytid();
s_num_tids = pvm_siblings(&p_other_tids); /* determine the size of my sibling list */
for(;;)
{
s_bufid=pvm_recv(-1,-1); /*waiting for a message*/
pvm_bufinfo(s_bufid,&s_msg_size,&s_msg_type,&s_src_tid); /*retrive info about the received message*/
/*accept only from father and from all the slave process to merger not from itself or from ring*/
if ((s_src_tid==s_father_tid)||((s_src_tid!=s_my_tid)&&(s_msg_type==PVM_MSG_JOIN)))
{
pvm_upkint(&s_rec_seq,1,1); /*retrive the sequence number*/
pvm_upkint(&s_option,1,1); /*retrive the option number*/
pvm_upkint(&s_size,1,1); /*retrive the size*/
if (s_alloc_size < s_size) /*check the size of the buffer*/
{
p_buffer=(char *)realloc(p_buffer,s_size); /*allocate/reallocate a buffer */
s_alloc_size=s_size;
}
memset(p_buffer,'\0',s_alloc_size);
if (s_size>0)
pvm_upkbyte(p_buffer,s_size,1); /*data packet*/
if ((s_msg_type==PVM_MSG_CONF)&&(s_option==PVM_CHECK_VERSION)) /*check which the type of msg*/
{
if (memcmp((char *)EXPORT_PVM_VERSION,p_buffer,strlen(EXPORT_PVM_VERSION)) !=0)
{
s_rc=1;
if((gethostname(s_hostname,sizeof(s_hostname)))!=0)
{
memset(s_hostname,'\0',sizeof(s_hostname));
snprintf(s_hostname,sizeof(s_hostname),"localhost-%d\n",getpid());
}
fprintf(stderr,"(%s) Invalid version: (%s) request (%s) on host %s\n",__FILE__,EXPORT_PVM_VERSION,p_buffer,s_hostname);
}
else
s_rc=0;
}
else if ((s_msg_type==PVM_MSG_CONF)&&(s_option==PVM_INIT_SKED)) /*check which the type of msg*/
{
memcpy(&s_nfrxtask,p_buffer,s_size);
s_rc=0;
}
else if ((s_msg_type==PVM_MSG_CONF)&&(s_option==PVM_INIT_JOIN)) /*check which the type of msg*/
{
memcpy(&s_join,p_buffer,s_size);
s_rc=0;
}
else if ((s_msg_type==PVM_MSG_WORK)&&(s_option==PVM_MERGER_INIT)) /*check which the type of msg*/
{
memcpy(&s_join,p_buffer,s_size);
s_msg_type=PVM_MSG_WRKN;
s_rc=0;
}
else if ((s_msg_type==PVM_MSG_WORK)&&(s_option==PVM_EXP_OPT_PREINIT)) /*check the option of msg*/
{
memcpy(&s_seq_preinit,p_buffer,s_size);
s_msg_type=PVM_MSG_WRKN;
s_rc=0;
}
else
{
if ((s_seq_preinit!=-1)&&((s_option==PVM_EXP_OPT_OPEN)||(s_option==PVM_EXP_OPT_INIT)))
p_result=(*f_my_elab_func)(s_option,p_buffer,s_size,s_seq_preinit);
else
p_result=(*f_my_elab_func)(s_option,p_buffer,s_size,s_rec_seq);
s_rc=p_result->s_rc;
if (p_result->s_msg_type != s_msg_type)
s_msg_type=p_result->s_msg_type; /*the type of msg can be change by the f_my_elab_func routine*/
}
switch (s_msg_type) /*check which the type of msg*/
{
case PVM_JOIN_OPT_ADD_ELAB:
if (s_join==-1)
{
fprintf(stderr,"(%s) Merger not yet started\n",__FILE__);
for (s_cont=0;s_cont< s_num_tids;s_cont++) /* determine the index of my task*/
{
if (p_other_tids[s_cont]!=s_my_tid)
{
pvm_kill(p_other_tids[s_cont]);
}
}
pvm_kill(s_father_tid);
break;
}
pvm_initsend(PvmDataDefault); /*initialize the send*/
pvm_pkint(&s_rec_seq,1,1); /*send packet sequence*/
s_cont=PVM_JOIN_OPT_ADD_ELAB; /*add to remove list*/
pvm_pkint(&s_cont,1,1); /*option data packet*/
pvm_pkint(&(p_result->s_ret_size),1,1); /*data packet*/
pvm_pkbyte(p_result->p_result,p_result->s_ret_size,1); /*data packet*/
pvm_send(s_join,PVM_MSG_JOIN); /*send the packet to the merger process*/
break;
case PVM_MSG_CONF_JOIN:
if (s_join==-1)
{
fprintf(stderr,"(%s) Merger not yet started\n",__FILE__);
for (s_cont=0;s_cont< s_num_tids;s_cont++) /* determine the index of my task*/
{
if (p_other_tids[s_cont]!=s_my_tid)
{
pvm_kill(p_other_tids[s_cont]);
}
}
pvm_kill(s_father_tid);
break;
}
pvm_initsend(PvmDataDefault); /*initialize the send*/
pvm_pkint(&s_rec_seq,1,1); /*send packet sequence*/
s_cont=PVM_JOIN_OPT_ADD_REMOVE; /*add to remove list*/
pvm_pkint(&s_cont,1,1); /*option data packet*/
pvm_pkint(&(p_result->s_ret_size),1,1); /*data packet*/
pvm_pkbyte(p_result->p_result,p_result->s_ret_size,1); /*data packet*/
pvm_send(s_join,PVM_MSG_JOIN); /*send the packet to the merger process*/
/*don't close the case need to send to the ring*/
case PVM_MSG_CONF:
s_msg_type=PVM_MSG_RING;
if (s_rec_seq!=-1)
(int)f_ring(s_father_tid,&s_rec_seq,s_msg_type,s_rc); /*wait for all task elaboration*/
break;
case PVM_MSG_WORK:
pvm_initsend(PvmDataDefault); /*initialize the send*/
pvm_pkint(&s_rec_seq,1,1); /*send packet sequence*/
pvm_pkint(&(p_result->s_rc),1,1); /*send the function rc*/
pvm_pkint(&(p_result->s_ret_size),1,1); /*data packet*/
pvm_pkbyte(p_result->p_result,p_result->s_ret_size,1); /*data packet*/
pvm_send(s_father_tid,s_rec_seq); /*send the packet*/
break;
case PVM_MSG_ENDTASK_SYSTEM:
pvm_initsend(PvmDataDefault); /*initialize the send*/
s_cont=0;
pvm_pkint(&s_rec_seq,1,1); /*send packet sequence*/
pvm_pkint(&(p_result->s_rc),1,1); /*send the function rc*/
pvm_pkint(&s_cont,1,1); /*data packet*/
// pvm_pkbyte(p_result->p_result,p_result->s_ret_size,1); /*not really used*/
pvm_send(s_father_tid,PVM_MSG_ENDTASK_SYSTEM); /*send the packet*/
break;
case PVM_MSG_ENDTASK_VIDEO:
case PVM_MSG_ENDTASK_AUDIO:
pvm_initsend(PvmDataDefault); /*initialize the send*/
s_cont=0;
pvm_pkint(&s_rec_seq,1,1); /*send packet sequence*/
pvm_pkint(&(p_result->s_rc),1,1); /*send the function rc*/
pvm_pkint(&s_cont,1,1); /*data packet*/
// pvm_pkbyte(p_result->p_result,p_result->s_ret_size,1); /*not really used*/
if (s_msg_type==PVM_MSG_ENDTASK_VIDEO)
pvm_send(s_father_tid,PVM_MSG_ENDTASK_VIDEO); /*send the packet*/
else
pvm_send(s_father_tid,PVM_MSG_ENDTASK_AUDIO); /*send the packet*/
break;
case PVM_MSG_MERG_SEND:
case PVM_MSG_ADD_REM:
if (s_join==-1)
{
fprintf(stderr,"(%s) Merger not yet started\n",__FILE__);
for (s_cont=0;s_cont< s_num_tids;s_cont++) /* determine the index of my task*/
{
if (p_other_tids[s_cont]!=s_my_tid)
{
pvm_kill(p_other_tids[s_cont]);
}
}
pvm_kill(s_father_tid);
break;
}
pvm_initsend(PvmDataDefault); /*initialize the send*/
pvm_pkint(&s_rec_seq,1,1); /*send packet sequence*/
if (s_msg_type==PVM_MSG_MERG_SEND)
s_cont=PVM_MSG_MERG_PASTE; /*add to remove list*/
else
s_cont=PVM_JOIN_OPT_ADD_REMOVE; /*add to remove list*/
pvm_pkint(&s_cont,1,1); /*option data packet*/
pvm_pkint(&(p_result->s_ret_size),1,1); /*data packet*/
pvm_pkbyte(p_result->p_result,p_result->s_ret_size,1); /*data packet*/
pvm_send(s_join,PVM_MSG_JOIN); /*send the packet to the merger process*/
break;
case PVM_MSG_WRKN:
case PVM_MSG_JOIN:
default:
break;
}
}
}
}
pvm_func_t *f_pvm_master_start_stop(char *p_option,char *p_spawn_process,char **p_argv,int s_nproc_host,int s_nproc_max,pvm_func_t *p_func)
{
int s_master_tid=-1; /* my task id */
static int s_num_call=0; /* my task id */
int s_started_task,s_cont,s_num_hosts,s_num_arch;
struct pvmhostinfo *p_host;
if (!strcasecmp(p_option, "close"))
{
if (p_func->p_slave_tids !=NULL)
for(s_cont=0;s_cont<p_func->s_nproc;s_cont++)
pvm_kill((p_func->p_slave_tids[s_cont])); /*terminate all slave process*/
if (s_num_call==1)
pvm_exit(); /* remove the master task from pvm*/
s_num_call--;
free(p_func->p_used_tid);
return(NULL);
}
else if (!strcasecmp(p_option, "open"))
{
memset((char *)p_func,'\0',sizeof(pvm_func_t));
s_num_call++; /*number of recall*/
p_func->s_nproc=0;
if((p_func->p_slave_tids=calloc(sizeof(int)*s_nproc_max,1)) == NULL) /*allocate the slave buffer*/
{
fprintf(stderr,"(%s) error allocating memory\n",__FILE__);
return(NULL);
}
if (s_num_call==1)
s_master_tid = pvm_mytid(); /* register the task in pvm */
/* Get config and set number of slaves to start */
pvm_config( &s_num_hosts, &s_num_arch, &p_host );
p_func->s_nhosts = s_num_hosts;
p_func->s_nproc = s_num_hosts * s_nproc_host;
if(p_func->s_nproc > s_nproc_max)
p_func->s_nproc = s_nproc_max;
p_func->p_used_tid=(int *)malloc(sizeof(int)*(p_func->s_nproc)); /*allocate the buffer for the serial number of tid*/
pvm_setopt(PvmShowTids,0);
pvm_catchout(stderr);
if ((s_started_task=pvm_spawn(p_spawn_process,p_argv,PvmTaskDefault,"",p_func->s_nproc,p_func->p_slave_tids))<0) /* start up all slave tasks */
{
pvm_perror("");
return(NULL);
}
else if (s_started_task < p_func->s_nproc)
{
for(s_cont=0;s_cont<p_func->s_nproc;s_cont++)
pvm_kill((p_func->p_slave_tids[s_cont])); /*terminate all slave process*/
pvm_exit(); /* remove the master task from pvm*/
return(NULL);
}
return(p_func);
}
else
{
fprintf(stderr,"(%s) invalid command \n",__FILE__);
return(NULL);
}
}
int f_ring(int s_father_tid,int *s_rec_seq,int s_msg_type,int s_rc)
{
int s_mytid; /* my task id */
int *p_other_tids; /* array of task ids */
int s_my_proc_id=0; /* my process number */
int s_cont;
int s_num_tids;
int s_src_tid,s_dst_tid,s_rc_prev;
s_mytid = pvm_mytid(); /*retrive tid*/
s_num_tids = pvm_siblings(&p_other_tids); /* determine the size of my sibling list */
for (s_cont=0;s_cont< s_num_tids;s_cont++) /* determine the index of my task*/
{
if (p_other_tids[s_cont]==s_mytid)
{
s_my_proc_id=s_cont;
break;
}
}
if (s_my_proc_id==0)
s_src_tid=p_other_tids[s_num_tids-1]; /* if i'm the task 0 i need to wait the wake up of the task n. s_num_tids-1 */
else
s_src_tid=p_other_tids[s_my_proc_id-1]; /* the other cases */
if (s_my_proc_id==s_num_tids-1)
s_dst_tid=p_other_tids[0]; /* if i'm the last task i need to wait the wake up of task n. 0 */
else
s_dst_tid=p_other_tids[s_my_proc_id+1]; /* the other cases */
if(s_my_proc_id==0)
{
pvm_initsend(PvmDataDefault); /*initialize the send*/
pvm_pkint(&s_rc,1,1); /*send the rc*/
pvm_send(s_dst_tid,PVM_MSG_RING); /*send to the next process*/
pvm_recv(s_src_tid,PVM_MSG_RING); /*waiting the close of the token*/
pvm_upkint(&s_rc_prev,1,1); /*receive the prev rc*/
pvm_initsend(PvmDataDefault); /*initialize the send to the main process*/
pvm_pkint(s_rec_seq,1,1); /*send the response to number*/
pvm_pkint(&s_rc_prev,1,1); /*send the rc*/
pvm_send(s_father_tid,s_msg_type); /*send the ok to the mcast process*/
return(0);
}
pvm_recv(s_src_tid,PVM_MSG_RING); /*waiting for the token*/
pvm_upkint(&s_rc_prev,1,1); /*receive the prev rc*/
if (s_rc_prev)
s_rc=s_rc_prev; /*send the prev error */
pvm_initsend(PvmDataDefault); /*initialize the send*/
pvm_pkint(&s_rc,1,1); /*send the rc*/
pvm_send(s_dst_tid,PVM_MSG_RING); /*send to the next process*/
return(0);
}