Computing.Net > Forums > OpenVMS > VMS Threads/ASTs... Problems

Computer Problems? Computing.Net has over 1,000,000 posts about all things technology related! Over 90% answered within 24 hours! Click here to start participating now! Also, be sure to check out the New User Guide.

VMS Threads/ASTs... Problems

Reply to Message Icon

Name: craigers1
Date: August 13, 2004 at 08:42:24 Pacific
OS: OpenVMS 6.2
CPU/Ram: 700 Series 600 processor
Comment:

I am running OpenVMS 6.2 on a multiprocessor Dec machine. I written a Tcp-Server that accepts job requests from a remote client, and passes these requests to a "database process" running on the same system as the Tcp Server. I am new to all of the concepts required: Threads, C code, ASTs, MBX, etc. My code works, but occassionally crashes with a "%DECthreads bugcheck (version V2.12-296), terminating execution.
% Running on OpenVMS VAX [OpenVMS V6.2; VAX 7000-640, 4 cpus, 512Mb]
% Reason: set_kernel: deadlock at _$22$DIA1:[CMARTL.SRC]CMA_MUTEX.C;1:2027"

My code has a primary thread to accept new connections. It then spawns a child thread per new connection. Each child thread serves a remote client. Each child thread gets TCP job requets from the client, then delivers the job my "database process" MBX. The "database process" returns job results to the child thread's MBX then, in turn, these results are sent via TCP to the client. The TCP messages from the client and the "job result" MBX messages from the "database server" are both trigger via ASTs.

If anyone can look through my code to offer suggestions, I would appreciate it. I am new to this, and have peiced this together from many different resources.


My code follows. I hope it is not inappropriate to post an entire program here....

Begin Code:
/********************************************************************
* To compile and link this server:
*
* $ CC T4
* $ LINK T4,SYS$INPUT:/OPT
* MULTINET:MULTINET_SOCKET_LIBRARY/SHARE
* SYS$SHARE:VAXCRTL/SHARE
* SYS$SHARE:CMA$LIB_SHR/SHARE
* ^Z
********************************************************************/

#include <ssdef.h>
#include <stsdef.h>
#include <descrip.h>
#include <psldef.h>
#include <lnmdef.h>
#include <lib$routines.h>
#include <starlet.h>
#include <iodef.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <time.h>
#include "multinet_root:[multinet.include.sys]types.h"
#include "multinet_root:[multinet.include.sys]socket.h"
#include "multinet_root:[multinet.include.sys]ioctl.h"
#include "multinet_root:[multinet.include.netinet]in.h"
#include "multinet_root:[multinet.include]netdb.h"
#include "multinet_root:[multinet.include.arpa]inet.h"
#include <pthread.h>
#include <dvidef.h>

#ifndef NULL /* Just in case this is not defined */
# define NULL ((void*)0)
#endif

struct request_buffer {
char SOH;
char clientName[20];
char destMbx[15];
int bornOnDate;
int timeout;
int receiveDate;
char replyDevice[20];
char commandBody[512];
char EOT;
};

struct reply_buffer {
char SOH;
char clientName[20];
char destMbx[15];
int bornOnDate;
int timeout;
int receiveDate;
char replyDevice[20];
unsigned short replyMessageLen;
char replyBody[19928];
};

struct request_AST {
struct request_buffer buf;
char isSet;
unsigned short iosb[4];
};

struct reply_AST {
struct reply_buffer buf;
unsigned short int clientSocket;
char isSet;
unsigned short iosb[4];
};

struct global_AST {
pthread_cond_t cond;
struct request_AST requestBuf;
struct reply_AST replyBuf;
char replyMbxName[30];
char replyDeviceName[10];
unsigned short int replyMbxChan;
};

unsigned int requestBufSize;
unsigned int replyBufSize;

/* Function prototypes */
void *serverProcess (void* arg);
void deleteMbx (char* mbxName);
void createMbx (char* commandBody);
void show_error (unsigned long int inStatus);
void processRequest (struct global_AST* requestBuf);
void processReply (struct global_AST* replyBuf);
char* now();

main()
{
const int MAX_CLIENTS = 200;
unsigned short serverSocket;
unsigned short clientSocket;
struct sockaddr_in addrServer;
struct sockaddr_in addrClient;
unsigned long int status;
pthread_t thread;
int clientCntr = 0;
int on=1;
int length;
struct request_buffer tmpRequestBuf;
struct reply_buffer tmpReplyBuf;

printf ("\nProcess started\n");
printf (" %s\n",now());

requestBufSize = sizeof(tmpRequestBuf);
replyBufSize = sizeof(tmpReplyBuf);

/********************************************************************
* CREATE AN IP-FAMILY SOCKET
********************************************************************/
serverSocket = socket(AF_INET, SOCK_STREAM, 0);
if (serverSocket < 0) {
socket_perror("acitcpserver: socket");
exit(0x10000000);
}

/********************************************************************
* Set the "REUSEADDR" option on this socket. This will allow us
* to bind() to it EVEN if there already connections in progress
* on this port number. Otherwise, we would get an "Address already
* in use" error.
********************************************************************/
if (setsockopt(serverSocket, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on))
< 0) {
socket_perror("acitcpserver: setsockopt");
exit(0x10000000);
}

/********************************************************************
* Create a "sockaddr_in" structure which describes the port we
* want to listen to. Address INADDR_ANY means we will accept
* connections to any of our local IP addresses.
********************************************************************/
addrServer.sin_family = AF_INET;
addrServer.sin_addr.s_addr = INADDR_ANY;
addrServer.sin_port = htons(51111);

/********************************************************************
* Bind to that address...
********************************************************************/
if (bind(serverSocket, &addrServer, sizeof (addrServer)) < 0) {
socket_perror("acitcpserver: bind");
exit(0x10000000);
}

/********************************************************************
* Declare to the kernel that we want to listen for connections
* on this port, and that the kernel may queue up to MAX_CLIENTS such
* connections for us.
********************************************************************/
if (listen(serverSocket, MAX_CLIENTS) < 0) {
socket_perror("acitcpserver: listen");
exit(0x10000000);
}

/********************************************************************
* Now go into a loop, waiting for Client connections and processing
* messages.
********************************************************************/
for (;;) {
/********************************************************************
* Call accept to accept a new connection. This 'peels'
* a connection off of the original socket and returns to us
* a new socket to the connection. We could now close
* down the original socket if we didn't want to handle
* more connections.
********************************************************************/
length = sizeof(addrClient); /* Pass in the length */
clientSocket = accept(serverSocket, &addrClient, &length);
if (clientSocket < 0) {
socket_perror("acitcpserver: accept");
exit(0x10000000);
}

/********************************************************************
* `addrClient' will be a sockaddr_in structure describing the
* remote IP address (and port #) which the connection
* was made from. Before we start to echo data, write a
* string into the network describing this port.
********************************************************************/
printf ("New connection accepted:\n") ;
printf (" %s\n",now());
printf (" IPA : %s\n", inet_ntoa(addrClient.sin_addr)) ;
printf (" Socket : %d\n", clientSocket) ;
printf (" Port : %u\n\n", htons(addrClient.sin_port)) ;


pthread_create ( &thread, pthread_attr_default, serverProcess,
(void*)clientSocket );
clientCntr++;
}
}

void *serverProcess (void *arg) {

unsigned short clientSocket = (unsigned short)arg;
struct global_AST globalBuf;
int msgLength;
unsigned short int serverMbx;
unsigned long int status;
struct dsc$descriptor_s name_d;
pthread_t me;

static pthread_mutex_t myLock;
struct timespec abstime;
struct timespec deltatime;

deltatime.tv_sec = 30;
deltatime.tv_nsec = 0;

pthread_mutex_init(&myLock, pthread_mutexattr_default);
pthread_mutex_lock(&myLock);
pthread_cond_init(&globalBuf.cond, pthread_condattr_default);

me = pthread_self();
globalBuf.replyBuf.clientSocket = clientSocket;
name_d.dsc$b_class = DSC$K_CLASS_S;
name_d.dsc$b_dtype = DSC$K_DTYPE_T;
globalBuf.requestBuf.isSet = 0;
globalBuf.replyBuf.isSet = 0;
strcpy(globalBuf.replyMbxName,"NONE");

printf("Host thread started.\n");
printf (" %s\n",now());
for (;;) {
if (!globalBuf.requestBuf.isSet) {
globalBuf.requestBuf.isSet = 1;
status = sys$qio(0, clientSocket, IO$_READVBLK,
&globalBuf.requestBuf.iosb,
processRequest,
&globalBuf,
&globalBuf.requestBuf.buf,
requestBufSize, 0, 0, 0, 0);
if (!(status & STS$M_SUCCESS)) {
printf (" (sys_assign) Error setting socket read AST.\n") ;
show_error (status);
pthread_exit((void*)status);
}
}
if (!globalBuf.replyBuf.isSet &&
(strcmp(globalBuf.replyMbxName,"NONE") != 0)) {
globalBuf.replyBuf.isSet = 1;
status = sys$qio(0, globalBuf.replyMbxChan, IO$_READVBLK,
&globalBuf.replyBuf.iosb,
processReply,
&globalBuf,
&globalBuf.replyBuf.buf,
replyBufSize, 0, 0, 0, 0);
if (!(status & STS$M_SUCCESS)) {
printf (" (sys_assign) Error setting reply MBX read AST.\n") ;
show_error (status);
pthread_exit((void*)status);
}
}
printf("\n");
while (globalBuf.requestBuf.isSet == 1 &&
(globalBuf.replyBuf.isSet == 1 ||
(strcmp(globalBuf.replyMbxName,"NONE") == 0))) {
status = pthread_get_expiration_np(&deltatime, &abstime);
status = pthread_cond_timedwait(&globalBuf.cond,
&myLock,
&abstime);
}

/********************************************************************
* If Client Disconnected, clean up and terminate.
********************************************************************/
if (globalBuf.requestBuf.isSet == 9) {
if (strcmp(globalBuf.replyMbxName,"NONE") != 0) {
status = sys$dassgn(globalBuf.replyMbxChan);
if (!(status & STS$M_SUCCESS)) {
printf
("(sys_dassgn) Error dettaching replyMbxChan [%s].\n",
name_d.dsc$a_pointer) ;
show_error (status);
}
}
pthread_mutex_unlock(&myLock);
printf ("Client thread terminated [%s].\n",globalBuf.replyMbxName);
printf (" %s",now());
pthread_exit((void*)status);
}
}

/********************************************************************
* socket_read() will return 0 on end-of-file, or -1 on error...
********************************************************************/
if (msgLength < 0) {
socket_perror("acitcpserver: read");
}

/********************************************************************
* Now close down the connection and terminate this thread
********************************************************************/
pthread_mutex_lock(&myLock);
printf ("Client has disconnected from socket [%s].\n",
globalBuf.replyMbxName);
printf (" %s",now());
socket_close(clientSocket);

return NULL;
}


void show_error (unsigned long int inStatus) {
short message_len;
char text[133] = "";
register tmpStatus;
struct dsc$descriptor_s message_text;

message_text.dsc$b_class = DSC$K_CLASS_S;
message_text.dsc$b_dtype = DSC$K_DTYPE_T;
message_text.dsc$w_length = 133;
message_text.dsc$a_pointer = text;
tmpStatus = sys$getmsg (inStatus, &message_len, &message_text, 15, 0);

/* Check for status conditions. */
if (tmpStatus == SS$_NORMAL) {
printf(" %.*s\n\n", message_len, text);
} else {
if (tmpStatus == SS$_BUFFEROVF) {
printf(" (show_error) BUFFER OVERFLOW -- Text is: %.*s\n\n",
message_len, text);
} else {
if (tmpStatus == SS$_MSGNOTFND) {
printf(" (show_error) MESSAGE NOT FOUND.\n\n");
printf(" SS$_STATUS: %d\n", inStatus);
} else {
printf(" (show_error) Unexpected error in $GETMSG call.\n\n");
printf(" SS$_STATUS: %d\n", inStatus);
}
}
}

}

void processRequest (struct global_AST* globalBuf) {

static unsigned long int status;
static struct dsc$descriptor_s name_d;
static short iosb[4];
static unsigned short int serverMbx;
static int write_function;
static struct timespec abstime;
static struct timespec deltatime;
static unsigned long int prot = 0xffff0000; /* Open permissions */
/* Data structures required for sys$trnlnm follow */
static unsigned long int index;
static unsigned long int max_index;
static const unsigned long int attributes = LNM$M_CASE_BLIND;
static const $DESCRIPTOR (table_name_d, "LNM$JOB");
static char equiv_name[255+1];
static struct dsc$descriptor_s equiv_name_d = { 0,
DSC$K_DTYPE_T,
DSC$K_CLASS_S,
equiv_name };
static struct {
unsigned short int length;
unsigned short int function;
void *buffer;
void *retlen;
} trnitms[] = { 4,
LNM$_INDEX,
&index,
NULL,
sizeof (equiv_name) - 1,
LNM$_STRING,
equiv_name,
&equiv_name_d.dsc$w_length,
4,
LNM$_MAX_INDEX,
&max_index,
NULL,
0, 0, NULL, NULL };
static const char access_mode = PSL$C_USER;


write_function = IO$_WRITEVBLK | IO$M_NOW | IO$M_NORSWAIT;
name_d.dsc$b_class = DSC$K_CLASS_S;
name_d.dsc$b_dtype = DSC$K_DTYPE_T;

/****************************************************************
* CHECK FOR CLIENT DISCONNECT
****************************************************************/
if (globalBuf->requestBuf.iosb[1] <= 0) {
printf ("Client Disconnected\n");
printf (" %s",now());
globalBuf->requestBuf.isSet = 9;
status = sys$dassgn(globalBuf->replyBuf.clientSocket);
if (!(status & STS$M_SUCCESS)) {
printf
("(sys_dassgn) Error dettaching client socket.\n");
show_error (status);
}
status = pthread_cond_signal(&globalBuf->cond);
return;
}

/****************************************************************
* PROCESS SERVER COMMAND
****************************************************************/
if (strncmp(globalBuf->requestBuf.buf.destMbx,"COMMAND",7) == 0) {
printf ("Server command received:\n") ;
printf (" %s\n",now());
printf (" clientName : %.20s\n",
globalBuf->requestBuf.buf.clientName) ;
if (strncmp((const char *)globalBuf->requestBuf.buf.commandBody,
"DELMBX:",7) == 0) {
deleteMbx (globalBuf->requestBuf.buf.commandBody);
} else {
if (strncmp((const char *)globalBuf->requestBuf.buf.commandBody,
"CREMBX:",7) == 0) {
createMbx (globalBuf->requestBuf.buf.commandBody);
} else {
printf (" Invalid command [%s].\n\n",
globalBuf->requestBuf.buf.commandBody);
}
}
} else {
/****************************************************************
* PROCESS NEW MESSAGE FROM CLIENT
****************************************************************/
printf ("Request received from client.\n") ;
printf (" %s\n",now());
/********************************************************************
* When the first message is received, attach to Reply MBX
* (Using clientName received in client msg).
* If first message was a DELMBX request, do not create MBX (client
* may be trying to delete it.
********************************************************************/
if (strcmp(globalBuf->replyMbxName,"NONE") == 0 &&
(strncmp(globalBuf->requestBuf.buf.commandBody,
"DELMBX",6) != 0)) {
strcpy(globalBuf->replyMbxName,globalBuf->requestBuf.buf.clientName);
name_d.dsc$b_class = DSC$K_CLASS_S;
name_d.dsc$b_dtype = DSC$K_DTYPE_T;

name_d.dsc$w_length = strlen(globalBuf->replyMbxName);
name_d.dsc$a_pointer = globalBuf->replyMbxName;
status = sys$crembx (0, /* 0=temp, 1=perm */
&globalBuf->replyMbxChan,
replyBufSize, /* Max message size */
(replyBufSize*6), /* Buffer space for messages */
prot,
PSL$C_USER,
&name_d,
0,
0);
if (!(status & STS$M_SUCCESS)) {
printf ("(sys_crembx) Error creating reply MBX [%s].\n",
name_d.dsc$a_pointer) ;
show_error (status);
pthread_exit((void*)status);
} else {
printf (" (sys_crembx) Successfully created reply MBX [%s].\n",
name_d.dsc$a_pointer) ;
/* Resolve device name for temp MBX */
trnitms[0].length = sizeof (equiv_name) - 1;
status = sys$trnlnm (&attributes,
&table_name_d,
&name_d,
&access_mode,
trnitms);
if (!(status & STS$M_SUCCESS)) {
printf (" (sys_trnlnm) Error resolving MBX device [%s].\n\n",
name_d.dsc$a_pointer) ;
show_error (status);
pthread_exit((void*)status);
} else {
printf (" (sys_trnlnm) MBX device is [%s].\n",
equiv_name_d.dsc$a_pointer) ;
strcpy(globalBuf->replyDeviceName,equiv_name_d.dsc$a_pointer);
}
}
}

strcpy(globalBuf->requestBuf.buf.replyDevice,
globalBuf->replyDeviceName);
/* Write current epoch to receiveDate */
deltatime.tv_sec = 0;
deltatime.tv_nsec = 0;
status = pthread_get_expiration_np(&deltatime, &abstime);
if (status != 0) {
printf (" (sys_assign) Error retrieving sys time.\n");
show_error (status);
return;
}
globalBuf->requestBuf.buf.receiveDate = abstime.tv_sec;
printf (" clientName : %.20s\n", globalBuf->requestBuf.buf.clientName) ;
printf (" destMbx : %.15s\n", globalBuf->requestBuf.buf.destMbx) ;
printf (" bornOnDate : %d\n", globalBuf->requestBuf.buf.bornOnDate) ;
printf (" timeout : %d\n", globalBuf->requestBuf.buf.timeout) ;
printf (" receiveDate : %d\n", globalBuf->requestBuf.buf.receiveDate) ;
printf (" commandBody : <%.512s>\n",
globalBuf->requestBuf.buf.commandBody) ;

/****************************************************************
* ATTACH DESTINATION SERVER MAILBOX
****************************************************************/
name_d.dsc$w_length = strlen(globalBuf->requestBuf.buf.destMbx);
name_d.dsc$a_pointer = globalBuf->requestBuf.buf.destMbx;
status = sys$assign (&name_d, &serverMbx, NULL, &name_d);
if (!(status & STS$M_SUCCESS)) {
printf (" (sys_assign) Error attaching to mailbox [%s].\n",
name_d.dsc$a_pointer) ;
show_error (status);
} else {
printf (" (sys_assign) Sucessfully attached to mailbox [%s].\n",
name_d.dsc$a_pointer) ;

/****************************************************************
* WRITE TO APPLICABLE SERVER MAILBOX
****************************************************************/
strncpy(globalBuf->requestBuf.buf.clientName,
globalBuf->requestBuf.buf.clientName,20);
status = sys$qiow (0, serverMbx, write_function, iosb, NULL, 0,
&globalBuf->requestBuf.buf, requestBufSize, 0, 0, 0, 0) ;
if (status & STS$M_SUCCESS) status = iosb[0] ;
if (!(status & STS$M_SUCCESS)) {
printf
(" (mbx_write) Error writing to mailbox [%s].\n",
name_d.dsc$a_pointer) ;
show_error (status);
} else {
printf (" Mailbox write succeeded.\n");
}

/****************************************************************
* DETTACH FROM DESTINATION SERVER MAILBOX
****************************************************************/
status = sys$dassgn (serverMbx);
if (!(status & STS$M_SUCCESS)) {
printf (" (sys_dassgn) Error dettaching from server mailbox [%s].\n",
name_d.dsc$a_pointer) ;
show_error (status);
}
}
}
printf(" processRequest complete!\n\n");
globalBuf->requestBuf.isSet = 0;
status = pthread_cond_signal(&globalBuf->cond);
}

void processReply (struct global_AST* globalBuf) {

static unsigned long int status;
static unsigned short int cntr;
static char tmpString[383];
static struct dsc$descriptor_s name_d;
static struct timespec abstime;
static struct timespec deltatime;

/****************************************************************
* CHECK FOR MAILBOX CLOSED
****************************************************************/
if (globalBuf->replyBuf.iosb[1] <= 0) {
printf ("Mailbox closed.\n");
printf (" %s",now());
status = pthread_cond_signal(&globalBuf->cond);
return;
}

/****************************************************************
* PROCESS REPLY FROM REMOTE SERVER
****************************************************************/
printf ("Reply received from remote server.\n") ;
printf (" %s\n",now());
printf (" clientName : %.20s\n", globalBuf->replyBuf.buf.clientName) ;
printf (" destMbx : %.15s\n", globalBuf->replyBuf.buf.destMbx) ;
printf (" bornOnDate : %d\n", globalBuf->replyBuf.buf.bornOnDate) ;
printf (" timeout : %d\n", globalBuf->replyBuf.buf.timeout) ;
printf (" receiveDate : %d\n", globalBuf->replyBuf.buf.receiveDate) ;
printf (" replyLen : %d\n", globalBuf->replyBuf.buf.replyMessageLen) ;
/* Copy replyBody to tmpString, replacing nulls with spaces */
for (cntr = 0; cntr < globalBuf->replyBuf.buf.replyMessageLen
&& cntr < 379; cntr++) {
if (globalBuf->replyBuf.buf.replyBody[cntr] == 0) {
tmpString[cntr] = ' ';
} else {
tmpString[cntr] = globalBuf->replyBuf.buf.replyBody[cntr];
}
}
tmpString[cntr] = 0;
tmpString[379] = '.';
tmpString[380] = '.';
tmpString[381] = '.';
tmpString[382] = 0;
printf (" replyBody : <%s>\n", tmpString);

/* Check message age, to see if it has expired */
deltatime.tv_sec = 0;
deltatime.tv_nsec = 0;
status = pthread_get_expiration_np(&deltatime, &abstime);
if (status != 0) {
printf (" (sys_assign) Error retrieving sys time.\n");
show_error (status);
return;
}
if (abstime.tv_sec - globalBuf->replyBuf.buf.receiveDate >
globalBuf->replyBuf.buf.timeout) {
printf (" Message is being discarded due to expiration time!\n");
} else {

/****************************************************************
* Write TCP message to client
****************************************************************/
printf (" Sending reply message.\n");
status = socket_write(globalBuf->replyBuf.clientSocket,
(void *)&globalBuf->replyBuf.buf,
globalBuf->replyBuf.buf.replyMessageLen+71);

if (status == -1) {
printf (" (socket_write) Error writting to socket.\n");
printf("Socket Error: %s\n",vms_errno_string());
} else {
printf (" Message delivered to client.\n");
}

}

/* Clear Reply Buffer */
for (cntr = 0; cntr < replyBufSize; cntr++) {
((char *)&globalBuf->replyBuf.buf)[cntr] = 0;
}

printf(" processReply complete!\n\n");
globalBuf->replyBuf.isSet = 0;
status = pthread_cond_signal(&globalBuf->cond);
}


void deleteMbx (char * commandBody) {
/***********************************************************
* DELETE MAILBOX
***********************************************************/

char* token;
char* sourceString;
unsigned long int status;
static unsigned short int channel;
struct dsc$descriptor_s name_d;

printf (" command : DELMBX\n") ;
sourceString = commandBody;
token = strtok (sourceString, ":");
token = strtok (NULL, ":");
printf (" Mailbox : %s\n",token) ;
/***********************************************************
* Attach channel to the mailbox to be deleted
***********************************************************/
name_d.dsc$w_length = strlen(token);
name_d.dsc$a_pointer = token;
name_d.dsc$b_class = DSC$K_CLASS_S;
name_d.dsc$b_dtype = DSC$K_DTYPE_T;
status = sys$assign (&name_d, &channel, NULL, &name_d);
if (!(status & STS$M_SUCCESS)) {
printf
(" (sys_assign) Error attaching to mailbox to delete [%s].\n",
name_d.dsc$a_pointer) ;
show_error (status);
return;
}
/***********************************************************
* Delete the mailbox
***********************************************************/
status = sys$delmbx (channel);
if (!(status & STS$M_SUCCESS)) {
printf
(" (sys_assign) Error deleteing mailbox [%s].\n",
name_d.dsc$a_pointer) ;
show_error (status);
return;
} else {
printf (" Mailbox delete succeeded.\n");
}
/***********************************************************
* Deattach to the mailbox to be deleted
***********************************************************/
status = sys$dassgn (channel);
if (!(status & STS$M_SUCCESS)) {
printf
(" (sys_assign) Error deassigning deleted mailbox channel.\n");
show_error (status);
return;
}
}

void createMbx (char * commandBody) {
/********************************************************************
* CREATE MAILBOX: This call will create a remote server MBX.
* Client MBXs are created as needed (and are a different size
* than this function supports).
********************************************************************/

unsigned long int status;
char* token;
char* sourceString;
static unsigned short int channel;
struct dsc$descriptor_s name_d;

static unsigned long int prot = 0xffff0000; /* Open permissions */

sourceString = commandBody;
token = strtok (sourceString, ":");
printf (" Mailbox : %s\n",token) ;
token = strtok (NULL, ":");
printf (" Mailbox : %s\n",token) ;
name_d.dsc$w_length = strlen(token);
name_d.dsc$a_pointer = token;
name_d.dsc$b_class = DSC$K_CLASS_S;
name_d.dsc$b_dtype = DSC$K_DTYPE_T;
status = sys$crembx (1, /* 0=temp, 1=perm */
&channel,
requestBufSize, /* Max message size */
requestBufSize*100, /* Buffer space for messages */
prot,
PSL$C_USER,
&name_d,
0,
0);
if (!(status & STS$M_SUCCESS)) {
printf (" (sys_dassgn) Error creating mailbox.\n");
show_error (status);
} else {
printf (" Mailbox create succeeded [%s]\n", name_d.dsc$a_pointer);
}
}

char* now() {
time_t rawtime;
struct tm * timeinfo;

time ( &rawtime );
timeinfo = localtime ( &rawtime );
return asctime (timeinfo);
}

Craig



Sponsored Link
Ads by Google

Response Number 1
Name: WillemGrooters
Date: August 16, 2004 at 01:02:04 Pacific
Reply:

First of all: you should be aware that threads and AST's don't go well together. AST's are delivered to a process and since all threads of an application run in the same context as their mother-tread, any of these can handle the AST set up by ANY thread in that context. There is no way to influence that! You can get "funny" behaviour thatw ay... Subprocesses do NOT have that problem - and are (IMHO) a much better way to implement 'threads' in VMS than (Unix-based) threading.
Now your problem.
A deadlock occurs if one resource-wait waits for another:
Proces (or thread) A requests resource X - gets it since it isn't blocked.
Proces (or thread) B requests resource Y - gets it since it isn't blocked.
Proces (or thread) A requests resource Y - but has to wait since it is blocked.
Proces (or thread) B requests resource X - but has to wait since it is blocked
.
This is a deadlock: Process A waits for process B and B for A.

You could prevent this in serveral ways:
1. ALWAYS request resources in the same order:
A requests X - gets it since it isn't blocked.
B requests X - has to wait since it's blocked.

Now process B can olnly contuinue when A releases X.

2. Create a 'critical path'.
A locks R1, requests X and gets it.
B requests R1, but has to wait.
A releases rource X and R1. Now B gets R1, and can do whatever is required.
Etc.
Actually the same as above, but now you control it yourself.
Locking can be done using the $ENQ system service, releaseing a lock (don't forgte that one!) using $DEQ (or $ENQ converting a lock)
Use the SYNCHRONOUS version here ($ENQW) if waiting is the only appropiate action. Use $ENQ but NO AST, just check the status. It will be somethinmg like "Not granted" so you can do something else.

Willem


Willem Grooters


0

Response Number 2
Name: craigers1
Date: August 16, 2004 at 09:10:04 Pacific
Reply:

Thanks a ton for your response. It's tough having no local resources to go for help. Given your reply, I have been trying to figure out how this deadlock applies to "my code". I had "assumed" there was very little resource contention in my code, as each thread has it's own socket to listen on, and the output MBX (which is shared), would not be contended for, as a thread should finish its job before being preempted (the only interrupts are of the same priority, thus, there should be no preemption).

However, your opening statments may reveal the root problem. I had further "assumed", that when a thread issued an AST to read from its own socket, that only that thread would be awakened when his own socket became readable (or reply MBX for that matter). Your response suggests that another thread may be have equal probabiltiy of being awoke in response to this data arrival. If so, then this may explain the deadlocks. Even then, my system seems to behave nicely for quite a while before this deadlock occurs (it seems like each thread is processing its own ASTs). It would seem that I would have many more failures if random threads were processing these ASTs (i.e. the data would frequently not be processed).

I think I will try to add some debug statements to show which threads are reacting to which requests. Maybe the ASTs are only occasionally being mishandled... We'll see.

Finally, if I must, I will pursue coding this using subprocesses. I have to figure out how to pass the socket information from the parent process to the subprocess--that is new territory for me.


Thanks!
Craig


0

Response Number 3
Name: craigers1
Date: August 16, 2004 at 10:52:33 Pacific
Reply:

I have figured out how to pass variables to the subprogram. I think I will pursue that implementation next.

Thanks,
Craig


0

Response Number 4
Name: craigers1
Date: August 17, 2004 at 09:07:37 Pacific
Reply:

I have the framework for a Subprocess architecture. My current problem is that I cannot detect the remote client's disconnect. My subprocess is waiting for TCP data "SYS$QIOW". When the remote client disconnects, this process remains at the QIOW (a potention process leak).

Thanks for any advice,
Craig


0

Response Number 5
Name: WillemGrooters
Date: August 19, 2004 at 00:54:22 Pacific
Reply:

Craig,
Exit of a subprocess or thread may depend on what reason. The "unix" way - just wait for a given period - can be used, but then use $QIO - combined with a timer:

$GETEF (for timer) gives you timer_ef
$GETEF (for QIO) gives you qio_ef
$CLREF (both of them)
$SETTMR with AST (this sets timer_ef)
$QIO with completion AST (this sets qio_ef)
$WFLOR (Check the doc for this)
Check event flag
If timer_ef is set then qio failure - subprocess likely to be gone
if qio_ef is set then it has completed, then cancel the timer (there is a system service for that)

A better approach is communication between main- and subprocess.
One approach may requires some extra in the subproces: Create and install an exception handler that signals the error to the creator. Secondly install an exit handler that sends an "ABORT" message. In that case, the listener will get an IO (unless quota has been exeeded and this cannot be sent...)

Yet another way:
Main process has a mailbox it listens to, which is used by all subprocesses.
It creates the subproces.
First thing the subproces does, is creating a temporary mailbox to listen to. It then sends the name (the logical name, or the device) to the main process.
When a message is to be sent to the subprocess, the device is checked for existance. If it doesn't exist, the subprocess is gone. If if does, a channel is assigned to the mailbox, and after this has been sent, the channel is deassigned (if you keep it assigned, the device will stay in existance).


(For "subproces" you can read "thread", for "mailbox" you can read "socket". But I would prefer the VMS way (RMS-based mailboxes since you can name them (and it must be unique)

Willem


Willem Grooters


0

Related Posts

See More



Response Number 6
Name: craigers1
Date: August 25, 2004 at 06:11:53 Pacific
Reply:

Thanks again for your responses. I finally have some code ready for production testing. In the meantime, my client machine (Unix server), has started acting up. As Murphy would have it, I have to wait until this new issue is resolved before I can begin testing.

Thanks!!
Craig


0

Response Number 7
Name: Bob Gezelter
Date: September 28, 2004 at 14:00:23 Pacific
Reply:

Craig,

I must respectfully disagree with Willem on the use of event flags and WFLOR. I have seen more than a few problems with the approach, particularly in high use situations.

These failures fall into different categories, including:
- race conditions causing; often causing hangs
- resource problems centered around the limited number of event flags
- other strange problems

Using the AST mechanism as intended avoids both the race conditions (by being implicitly synchronized) and the resource problems (there are quota limits on ASTs, but their maximum value is far greater than the limited number of event flags).

I am somewhat behind (large understatement) on posting my presentations on my www site, but you may find the following helpful:

Events and Threads
- http://www.rlgsc.com/hpets/2002/1228.html

Introduction to OpenVMS AST Programming
- http://www.rlgsc.com/cets/2000/435.html

OpenVMS AST Internals & an earlier Threads vs. Events
- http://www.rlgsc.com/decus/usf96/index.html

I have more extensive materials, but I haven't gotten around to finishing the back postings and indices on the www site.

I hope that the above is helpful.

- Bob


0

Sponsored Link
Ads by Google
Reply to Message Icon






Post Locked

This post is quite old and has been locked from receiving new replies. Please create a new posting instead.


Go to OpenVMS Forum Home


Sponsored links

Ads by Google


Results for: VMS Threads/ASTs... Problems

sftp from vms to linux www.computing.net/answers/openvms/sftp-from-vms-to-linux/493.html

Open VMS, help me please www.computing.net/answers/openvms/open-vms-help-me-please/458.html

Problem with ana/err & erfmscp.exe www.computing.net/answers/openvms/problem-with-anaerr-amp-erfmscpexe/267.html