more tidy ups of the libmq library

This commit is contained in:
Fish 2005-06-22 10:53:05 +00:00
parent d2769ce98a
commit a3dadddbba
18 changed files with 144 additions and 392 deletions

4
.gitattributes vendored
View file

@ -58,11 +58,9 @@ confuse/lexer.l -text
/depcomp -text
/install-sh -text
lib/include/buffer.h -text
lib/include/defines.h -text
lib/include/libmq.h -text
lib/include/packet.h -text
lib/include/simpleif.h -text
lib/include/sock.h -text
lib/include/xds.h -text
lib/include/xds.h.in -text
lib/include/xds_engine_xdr_mqs.h -text
@ -73,7 +71,7 @@ lib/src/buffer.c -text
lib/src/decodepacket.c -text
lib/src/encodepacket.c -text
lib/src/packet.c -text
lib/src/sock.c -text
lib/src/simpleif.c -text
lib/src/xds.c -text
lib/src/xds_engine_xdr.c -text
lib/src/xds_engine_xdr_mqs.c -text

View file

@ -27,9 +27,7 @@
#define DEFINES_H
#include <stddef.h>
#define _GNU_SOURCE
#include <stdio.h>
#undef _GNU_SOURCE
#include <sys/types.h>
#include <sys/socket.h>
#include <errno.h>

View file

@ -26,6 +26,7 @@
#ifndef SERVERSOCK_H
#define SERVERSOCK_H
#include "libmq.h"
#include "defines.h"
#include "event.h"
#include "list.h"

View file

@ -36,8 +36,9 @@
#include <ncurses.h>
#include <term.h>
#include "defines.h"
#include "libmq.h"
#include "packet.h"
#include "simpleif.h"
#include "log.h"
void pck_logger(char *fmt,...);

View file

@ -1,120 +0,0 @@
/* NeoStats - IRC Statistical Services
** Copyright (c) 1999-2004 Adam Rutter, Justin Hammond
** http://www.neostats.net/
**
** Portions Copyright (c) 2000-2001 ^Enigma^
**
** This program is free software; you can redistribute it and/or modify
** it under the terms of the GNU General Public License as published by
** the Free Software Foundation; either version 2 of the License, or
** (at your option) any later version.
**
** This program is distributed in the hope that it will be useful,
** but WITHOUT ANY WARRANTY; without even the implied warranty of
** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
** GNU General Public License for more details.
**
** You should have received a copy of the GNU General Public License
** along with this program; if not, write to the Free Software
** Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
** USA
**
** NeoStats CVS Identification
** $Id$
*/
#ifndef DEFINES_H
#define DEFINES_H
#define _GNU_SOURCE 1
#include <stddef.h>
#include <stdio.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <errno.h>
#include <netdb.h>
#include <netinet/in.h>
#include <unistd.h>
#include <time.h>
#include <stdarg.h>
#include <stdlib.h>
#include <sys/time.h>
#include <ctype.h>
#include <sys/stat.h>
#include <sys/resource.h>
#include <setjmp.h>
#include <assert.h>
#include "config.h"
#include "list.h"
#include "hash.h"
/* Temp disable for upcoming release until all external modules
* have been released with warnings fixed
*/
#if 0
#define __attribute__(x) /* NOTHING */
#else
/* If we're not using GNU C, elide __attribute__ */
#ifndef __GNUC__
#define __attribute__(x) /* NOTHING */
#endif
#endif
#define BUFSIZE 512
#ifndef MAXHOST
#define MAXHOST 128
#endif
#ifndef MAXPASS
#define MAXPASS 32
#endif
#ifndef MAXUSER
#define MAXUSER 15
#endif
#ifndef MAXCONNECTIONS
#define MAXCONNECTIONS 10
#endif
/* MAXPATH
* used to determine buffer sizes for file system operations
*/
#ifndef MAXPATH
#define MAXPATH 1024
#endif /* MAXPATH */
#define bzero(x, y) memset(x, '\0', y);
#define NS_SUCCESS 1
#define NS_FAILURE -1
typedef enum LOG_LEVEL {
LOG_CRITICAL=5,
LOG_WARNING,
LOG_NORMAL,
LOG_INFO,
} LOG_LEVEL;
typedef enum DEBUG_LEVEL {
DBG1=1,
DBG2,
DBG3,
DBG4,
} DEBUG_LEVEL;
/* logging defines */
#ifdef DEBUG
#define TRACE(w,x,...) dotrace(w, x,__FILE__, __LINE__, __PRETTY_FUNCTION__,__VA_ARGS__)
#else
#define TRACE(x,y,...);
#endif
#endif

View file

@ -23,8 +23,8 @@
** $Id$
*/
#ifndef DEFINES_H
#define DEFINES_H
#ifndef LIBMQ_H
#define LIBMQ_H
#define _GNU_SOURCE 1
@ -94,17 +94,17 @@
#define NS_FAILURE -1
typedef enum LOG_LEVEL {
LOG_CRITICAL=5,
LOG_WARNING,
LOG_NORMAL,
LOG_INFO,
MQLOG_CRITICAL=5,
MQLOG_WARNING,
MQLOG_NORMAL,
MQLOG_INFO,
} LOG_LEVEL;
typedef enum DEBUG_LEVEL {
DBG1=1,
DBG2,
DBG3,
DBG4,
MQDBG1=1,
MQDBG2,
MQDBG3,
MQDBG4,
} DEBUG_LEVEL;
/* logging defines */

View file

@ -26,7 +26,7 @@
#ifndef PACKET_H
#define PACKET_H
#include "defines.h"
#include "libmq.h"
#include "xds.h"
#include "xds_engine_xdr_mqs.h"
#include "buffer.h"
@ -74,9 +74,9 @@
#define PCK_CLNTCAP_FMT "int32 int32 string"
#define PCK_AUTH_FMT "string string"
#define PCK_JOINQUEUE_FMT "string int32 string"
#define PCK_JOINQUEUE_FMT "string int32 string"
#define PCK_SENDTOQUEUE_FMT "string int32 string octet"
#define PCK_QUEUEINFO_FMT "string int32 string"
#define PCK_QUEUEINFO_FMT "string int32 string"
#define PCK_MSGFROMQUEUE_FMT "string string octet int32 int32 string"
/* packet flags, not message flags */
@ -155,7 +155,7 @@ typedef struct mqpacket {
xds_t *xdsout;
} mqpacket;
typedef void (logfunc)(char *fmt, ...) __attribute__((format(printf,1,2))); /* 3=format 4=params */
typedef void (logfunc)(int level, char *fmt, ...) __attribute__((format(printf,2,3))); /* 3=format 4=params */
typedef int (connectauthfunc)(void *, mqpacket *);
typedef int (callbackfunc)(void *, mqpacket *);
@ -173,14 +173,10 @@ typedef struct mqp {
connectauthfunc *connectauth;
myeng *myengines;
callbackfunc *callback;
int debug;
int loglvl;
} mqp;
/* defines for the structentry type fields */
#define STR_PSTR 1
#define STR_STR 2
#define STR_INT 3
typedef struct {
int type;
size_t size;
@ -194,20 +190,15 @@ typedef struct {
void *(*mqlib_malloc)(size_t);
void (*mqlib_free)(void *);
extern int encode_mqs_header (xds_t * xds, void *engine_context, void *buffer, size_t buffer_size, size_t * used_buffer_size, va_list * args);
extern int decode_mqs_header (xds_t * xds, void *engine_context, void *buffer, size_t buffer_size, size_t * used_buffer_size, va_list * args);
void pck_set_logger(mqp *, logfunc *logger);
void pck_set_callback(mqp *, callbackfunc *);
void pck_set_authcallback(mqp *, connectauthfunc *);
void pck_set_data(mqpacket *, void *);
void pck_set_dbglvl(mqp *, DEBUG_LEVEL);
mqp *init_mqlib ();
void fini_mqlib(mqp *mqplib);
int read_fd (mqp *mqplib, mqpacket *mqp);
int close_fd (mqp *mqplib, mqpacket * mqp);
int write_fd (mqp *mqplib, mqpacket * mqp);
@ -225,18 +216,10 @@ unsigned long pck_send_queueinfo(mqp *mqplib, mqpacket *mqpck, char *queue, char
unsigned long pck_send_queue_mes(mqp *mqplib, mqpacket *mqpck, char *queue, char *topic, void *data, size_t len, unsigned long messid, long timestamp, char *from);
xds_t * pck_init_engines (mqp *mqplib, int type, int direction);
/* this is the standalone un-threadsafe interface */
typedef int (actioncbfunc)(int, void *);
int init_socket(actioncbfunc *);
int debug_socket(int i);
int enable_server(int port);
int pck_process ();
int pck_make_connection (char *hostname, char *, char *, long , void *cbarg, actioncbfunc *);
unsigned long pck_simple_send_message_struct(int conid, structentry *mystruct, int cols, void *data, char *destination, char *topic);
unsigned long pck_simple_joinqueue(int conid, char *queue, int flags, char *filter);
mq_data_joinqueue *pck_get_queueinfo(int conid);
mq_data_senddata *pck_get_msgfromqueue(int conid);
int pck_decode_message(mq_data_senddata *sd, structentry *mystruct, int cols, void *target);
void dotrace(mqp *mqplib, DEBUG_LEVEL level, char *fname, int lno, char *fnct, char *fmt, ...) __attribute__((format(printf,6,7)));
void MQLOG(mqp *mqplib, LOG_LEVEL level, char *fmt, ...) __attribute__((format(printf,3,4)));
/* these are error defines */
#define PCK_ERR_BUFFULL 1
@ -246,24 +229,4 @@ int pck_decode_message(mq_data_senddata *sd, structentry *mystruct, int cols, vo
/* these are the simple callback types */
#define PCK_SMP_LOGINOK 1
#define PCK_SMP_QUEUEINFO 2
#define PCK_SMP_MSGFROMQUEUE 3
#define PCK_SMP_CLNTCAPREJ -1
#define PCK_SMP_AUTHREJ -2
#endif

View file

@ -26,6 +26,8 @@
#ifndef SOCK_H
#define SOCK_H
#include "libmq.h"
#define MQS_S_FLAG_GOTSRVCAP 0x01
#define MQS_S_FLAG_SENTAUTH 0x02
#define MQS_S_FLAG_CONNECTOK 0x04

View file

@ -1,78 +0,0 @@
/* NeoStats - IRC Statistical Services
** Copyright (c) 1999-2004 Adam Rutter, Justin Hammond
** http://www.neostats.net/
**
** Portions Copyright (c) 2000-2001 ^Enigma^
**
** This program is free software; you can redistribute it and/or modify
** it under the terms of the GNU General Public License as published by
** the Free Software Foundation; either version 2 of the License, or
** (at your option) any later version.
**
** This program is distributed in the hope that it will be useful,
** but WITHOUT ANY WARRANTY; without even the implied warranty of
** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
** GNU General Public License for more details.
**
** You should have received a copy of the GNU General Public License
** along with this program; if not, write to the Free Software
** Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
** USA
**
** NeoStats CVS Identification
** $Id$
*/
#ifndef SOCK_H
#define SOCK_H
#define MQS_S_FLAG_GOTSRVCAP 0x01
#define MQS_S_FLAG_SENTAUTH 0x02
#define MQS_S_FLAG_CONNECTOK 0x04
#define MQS_S_FLAG_SET_GOTSRVCAP(x) (x->flags |= MQS_S_FLAG_GOTSRVCAP)
#define MQS_S_FLAG_IS_GOTSRVCAP(x) (x->flags & MQS_S_FLAG_GOTSRVCAP)
#define MQS_S_FLAG_SET_SENTAUTH(x) (x->flags = MQS_S_FLAG_SENTAUTH)
#define MQS_S_FLAG_IS_SENTAUTH(x) (x->flags & MQS_S_FLAG_SENTAUTH)
#define MQS_S_FLAG_SET_CONNECTOK(x) (x->flags = MQS_S_FLAG_CONNECTOK)
#define MQS_S_FLAG_IS_CONNECTOK(x) (x->flags & MQS_S_FLAG_CONNECTOK)
/* these are the simple callback types */
#define PCK_SMP_LOGINOK 1
#define PCK_SMP_QUEUEINFO 2
#define PCK_SMP_MSGFROMQUEUE 3
#define PCK_SMP_CLNTCAPREJ -1
#define PCK_SMP_AUTHREJ -2
/* defines for the structentry type fields */
#define STR_PSTR 1
#define STR_STR 2
#define STR_INT 3
/* this is the standalone un-threadsafe interface */
typedef int (actioncbfunc)(int, void *);
int init_socket(actioncbfunc *);
int debug_socket(int i);
int enable_server(int port);
int pck_process ();
int pck_make_connection (char *hostname, char *, char *, long , void *cbarg, actioncbfunc *);
unsigned long pck_simple_send_message_struct(int conid, structentry *mystruct, int cols, void *data, char *destination, char *topic);
unsigned long pck_simple_joinqueue(int conid, char *queue, int flags, char *filter);
mq_data_joinqueue *pck_get_queueinfo(int conid);
mq_data_senddata *pck_get_msgfromqueue(int conid);
int pck_decode_message(mq_data_senddata *sd, structentry *mystruct, int cols, void *target);
#endif

View file

@ -1,6 +1,6 @@
lib_LTLIBRARIES = libmqpacket.la
libmqpacket_la_SOURCES = packet.c decodepacket.c encodepacket.c\
xds.c xds_engine_xdr.c xds_engine_xml.c xds_version.c \
xds_engine_xdr_mqs.c sock.c buffer.c
xds_engine_xdr_mqs.c simpleif.c buffer.c
INCLUDES = -I../include
include $(top_srcdir)/rules.mk

View file

@ -61,7 +61,7 @@ LTLIBRARIES = $(lib_LTLIBRARIES)
libmqpacket_la_LIBADD =
am_libmqpacket_la_OBJECTS = packet.lo decodepacket.lo encodepacket.lo \
xds.lo xds_engine_xdr.lo xds_engine_xml.lo xds_version.lo \
xds_engine_xdr_mqs.lo sock.lo buffer.lo
xds_engine_xdr_mqs.lo simpleif.lo buffer.lo
libmqpacket_la_OBJECTS = $(am_libmqpacket_la_OBJECTS)
DEFAULT_INCLUDES = -I. -I$(srcdir) -I$(top_builddir)/Include
depcomp = $(SHELL) $(top_srcdir)/depcomp
@ -206,7 +206,7 @@ xds_uint8_t = @xds_uint8_t@
lib_LTLIBRARIES = libmqpacket.la
libmqpacket_la_SOURCES = packet.c decodepacket.c encodepacket.c\
xds.c xds_engine_xdr.c xds_engine_xml.c xds_version.c \
xds_engine_xdr_mqs.c sock.c buffer.c
xds_engine_xdr_mqs.c simpleif.c buffer.c
INCLUDES = -I../include
LINK = $(LIBTOOL) --tag=CXX --mode=link $(CCLD) $(AM_CFLAGS) $(CFLAGS) \
@ -275,7 +275,7 @@ distclean-compile:
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/decodepacket.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/encodepacket.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/packet.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/sock.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/simpleif.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/xds.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/xds_engine_xdr.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/xds_engine_xdr_mqs.Plo@am__quote@

View file

@ -30,7 +30,7 @@
/* needed for crc32 function */
#include <zlib.h>
#include "defines.h"
#include "libmq.h"
#include "list.h"
#include "packet.h"
#include "xds.h"
@ -48,8 +48,7 @@ pck_parse_packet (mqp *mqplib, mqpacket * mqp)
printf("%d\n", mqp->inbuf->off);
#endif
if (xds_setbuffer (mqp->xdsin, XDS_LOAN, mqp->inbuf->buffer, mqp->inbuf->off) != XDS_OK) {
if (mqplib->logger)
mqplib->logger ("XDS setbuffer Failed");
MQLOG(mqplib, MQLOG_WARNING, "XDS setbuffer Failed");
/* XXX drop client ? */
return NS_FAILURE;
}
@ -72,26 +71,22 @@ pck_parse_packet (mqp *mqplib, mqpacket * mqp)
usedbuf = xds_get_usedbuffer (mqp->xdsin);
break;
case XDS_ERR_UNDERFLOW:
if (mqplib->logger)
mqplib->logger ("XDS Decode of Header Failed. Buffer Underflow");
MQLOG(mqplib, MQLOG_INFO, "XDS Decode of Header Failed. Buffer Underflow");
/* don't consume any buffer */
return -2;
default:
#ifdef PACKDEBUG
printf("%s\n", mqp->inbuf->buffer);
TRACE(mqplib, MQDBG4, "PackDebug: %s", mqp->inbuf->buffer);
#endif
if (mqplib->logger)
mqplib->logger ("XDS Decode of Header Failed: %d", rc);
MQLOG(mqplib, MQLOG_WARNING,"XDS Decode of Header Failed: %d", rc);
print_decode(mqp, 1);
/* drop client */
return NS_FAILURE;
}
if (mqplib->logger)
mqplib->logger ("Got Packet Decode: mid %d msgtype %d Version %d flags %d ", mqp->inmsg.MID, mqp->inmsg.MSGTYPE, mqp->inmsg.VERSION, mqp->inmsg.flags);
TRACE(mqplib, MQDBG1, "Got Packet Decode: mid %d msgtype %d Version %d flags %d ", mqp->inmsg.MID, mqp->inmsg.MSGTYPE, mqp->inmsg.VERSION, mqp->inmsg.flags);
/* check the version number */
if (mqp->inmsg.VERSION != 1) {
if (mqplib->logger)
mqplib->logger ("Invalid Protocol Version recieved");
MQLOG(mqplib, MQLOG_WARNING, "Invalid Protocol Version recieved");
/* XXX drop client ? */
return NS_FAILURE;
}
@ -125,8 +120,7 @@ pck_parse_packet (mqp *mqplib, mqpacket * mqp)
rc = xds_decode(mqp->xdsin, PCK_MSGFROMQUEUE_FMT, &mqp->inmsg.data.sendmsg.queue, &mqp->inmsg.data.sendmsg.topic, &mqp->inmsg.data.sendmsg.data, &mqp->inmsg.data.sendmsg.len, &mqp->inmsg.data.sendmsg.messid, &mqp->inmsg.data.sendmsg.timestamp, &mqp->inmsg.data.sendmsg.from);
break;
default:
if (mqplib->logger)
mqplib->logger ("Invalid MsgType Recieved");
MQLOG(mqplib, MQLOG_WARNING,"Invalid MsgType Recieved");
/* XXX drop client */
return NS_FAILURE;
}
@ -135,13 +129,10 @@ pck_parse_packet (mqp *mqplib, mqpacket * mqp)
usedbuf = xds_get_usedbuffer (mqp->xdsin);
break;
case XDS_ERR_UNDERFLOW:
if (mqplib->logger)
mqplib->logger ("XDS Decode of Data Failed. Buffer Underflow");
/* don't consume any buffer */
return -2;
default:
if (mqplib->logger)
mqplib->logger ("XDS Decode of Data Failed: %d", rc);
MQLOG(mqplib, MQLOG_WARNING, "XDS Decode of Data Failed: %d", rc);
/* drop client */
print_decode(mqp, 1);
return NS_FAILURE;
@ -153,13 +144,11 @@ pck_parse_packet (mqp *mqplib, mqpacket * mqp)
usedbuf = xds_get_usedbuffer (mqp->xdsin);
break;
case XDS_ERR_UNDERFLOW:
if (mqplib->logger)
mqplib->logger ("XDS Decode of Footer Failed. Buffer Underflow");
MQLOG(mqplib, MQLOG_INFO, "XDS Decode of Footer Failed. Buffer Underflow");
/* don't consume any buffer */
return -2;
default:
if (mqplib->logger)
mqplib->logger ("XDS Decode of Footer Failed: %d", rc);
MQLOG(mqplib, MQLOG_WARNING, "XDS Decode of Footer Failed: %d", rc);
/* drop client */
print_decode(mqp, 1);
return NS_FAILURE;
@ -202,8 +191,7 @@ pck_parse_packet (mqp *mqplib, mqpacket * mqp)
(mqlib_free)(mqp->inmsg.data.sendmsg.from);
break;
default:
if (mqplib->logger)
mqplib->logger ("Invalid MsgType Recieved");
MQLOG(mqplib, MQLOG_WARNING, "Invalid MsgType Recieved");
}
/* finished processing the message. grab what buffer we consumed and return */
return usedbuf;

View file

@ -28,9 +28,10 @@
#include <arpa/inet.h>
#include "defines.h"
#include "libmq.h"
#include "list.h"
#include "packet.h"
#include "simpleif.h"
#define SRVCAP1 2
#define SRVCAP2 3
@ -55,15 +56,12 @@ int pck_prepare(mqp *mqplib, mqpacket *mqp, int type) {
rc = xds_encode (mqp->xdsout, "mqpheader", mqp);
}
if (rc != XDS_OK) {
if (mqplib->logger)
mqplib->logger ("xds encode header failed %d.", rc);
MQLOG(mqplib, MQLOG_WARNING, "xds encode header failed %d.", rc);
return NS_FAILURE;
}
return NS_SUCCESS;
} else {
if (mqplib->logger) {
mqplib->logger("Outbound Message Not clear %d", mqp->outmsg.MID);
}
MQLOG(mqplib, MQLOG_CRITICAL, "Outbound Message Not clear %d", mqp->outmsg.MID);
return NS_FAILURE;
}
}
@ -83,8 +81,7 @@ unsigned long pck_send_ack(mqp *mqplib, mqpacket *mqp, int MID) {
rc = xds_encode (mqp->xdsout, PCK_ACK_FMT, MID);
if (rc != XDS_OK) {
if (mqplib->logger)
mqplib->logger ("xds encode ack failed %d.", rc);
MQLOG(mqplib, MQLOG_WARNING, "xds encode ack failed %d.", rc);
return NS_FAILURE;
}
@ -107,8 +104,7 @@ unsigned long pck_send_srvcap(mqp *mqplib, mqpacket *mqp) {
rc = xds_encode (mqp->xdsout, PCK_SRVCAP_FMT, SRVCAP1, SRVCAP2, srvcap);
if (rc != XDS_OK) {
if (mqplib->logger)
mqplib->logger ("xds encode srvcap failed %d.", rc);
MQLOG(mqplib, MQLOG_WARNING, "xds encode srvcap failed %d.", rc);
return NS_FAILURE;
}
@ -131,8 +127,7 @@ unsigned long pck_send_clntcap(mqp *mqplib, mqpacket *mqp) {
rc = xds_encode (mqp->xdsout, PCK_CLNTCAP_FMT, CLNTCAP1, CLNTCAP2, clntcap);
if (rc != XDS_OK) {
if (mqplib->logger)
mqplib->logger ("xds encode clntcap failed %d.", rc);
MQLOG(mqplib, MQLOG_WARNING, "xds encode clntcap failed %d.", rc);
return NS_FAILURE;
}
@ -155,8 +150,7 @@ unsigned long pck_send_error(mqp *mqplib, mqpacket *mqp, char *fmt, ...) {
rc = xds_encode (mqp->xdsout, PCK_ERROR_FMT, log_buf);
if (rc != XDS_OK) {
if (mqplib->logger)
mqplib->logger ("xds encode error failed %d.", rc);
MQLOG(mqplib, MQLOG_WARNING, "xds encode error failed %d.", rc);
return NS_FAILURE;
}
@ -172,8 +166,7 @@ unsigned long pck_send_auth(mqp *mqplib, mqpacket *mqp, char *username, char *pa
rc = xds_encode (mqp->xdsout, PCK_AUTH_FMT, username, password);
if (rc != XDS_OK) {
if (mqplib->logger)
mqplib->logger ("xds encode auth failed %d.", rc);
MQLOG(mqplib, MQLOG_WARNING, "xds encode auth failed %d.", rc);
return NS_FAILURE;
}
@ -192,15 +185,13 @@ pck_commit_data (mqp * mqplib, mqpacket * mqpck)
rc = xds_encode (mqpck->xdsout, "xmlstop");
if (rc != XDS_OK) {
if (mqplib->logger)
mqplib->logger ("xds encode xmlstop failed %d.", rc);
MQLOG(mqplib, MQLOG_WARNING, "xds encode xmlstop failed %d.", rc);
return NS_FAILURE;
}
}
rc = xds_getbuffer (mqpck->xdsout, XDS_GIFT, (void **) &buffer, &buflen) ;
if (rc != XDS_OK) {
if (mqplib->logger)
mqplib->logger ("OutBuffer is Full. %d", rc);
MQLOG(mqplib, MQLOG_WARNING, "OutBuffer is Full. %d", rc);
return NS_FAILURE;
}
mqbuffer_add(mqpck->outbuf, buffer, buflen);
@ -225,8 +216,7 @@ pck_send_message_struct(mqp *mqplib, mqpacket *mqpck, structentry *mystruct, int
mydata = (mystruct[i].readcb)(data, &rc);
rc = xds_encode (mqpck->xdsout, "string", mydata);
if (rc != XDS_OK) {
if (mqplib->logger)
mqplib->logger ("xds encode header failed %d.", rc);
MQLOG(mqplib, MQLOG_WARNING, "xds encode pstring in message_struct failed %d.", rc);
return NS_FAILURE;
}
@ -237,22 +227,19 @@ pck_send_message_struct(mqp *mqplib, mqpacket *mqpck, structentry *mystruct, int
myint = *(int *)mydata;
rc = xds_encode (mqpck->xdsout, "int32", myint);
if (rc != XDS_OK) {
if (mqplib->logger)
mqplib->logger ("xds encode header failed %d.", rc);
MQLOG(mqplib, MQLOG_WARNING, "xds encode int in message_struct failed %d.", rc);
return NS_FAILURE;
}
break;
case STR_STR:
mydata = data + mystruct[i].offset;
if (strlen(mydata) > mystruct[i].size) {
if (mqplib->logger)
mqplib->logger ("String In Column %d is too long (%d > %d). Not Encoding", i, strlen(mydata), mystruct[i].size);
MQLOG(mqplib, MQLOG_WARNING,"String In Column %d is too long (%d > %d). Not Encoding", i, strlen(mydata), mystruct[i].size);
break;
}
rc = xds_encode (mqpck->xdsout, "string", (char *)mydata);
if (rc != XDS_OK) {
if (mqplib->logger)
mqplib->logger ("xds encode header failed %d.", rc);
MQLOG(mqplib, MQLOG_WARNING, "xds encode string in message_struct failed %d.", rc);
return NS_FAILURE;
}
break;
@ -260,8 +247,7 @@ pck_send_message_struct(mqp *mqplib, mqpacket *mqpck, structentry *mystruct, int
}
rc = xds_getbuffer (mqpck->xdsout, XDS_GIFT, (void **) &buffer, &bufferlen) ;
if (rc != XDS_OK) {
if (mqplib->logger)
mqplib->logger ("OutBuffer is Full. %d", rc);
MQLOG(mqplib,MQLOG_WARNING, "OutBuffer is Full. %d", rc);
return NS_FAILURE;
}
if (pck_prepare(mqplib, mqpck, PCK_SENDTOQUEUE) != NS_SUCCESS) {
@ -270,8 +256,7 @@ pck_send_message_struct(mqp *mqplib, mqpacket *mqpck, structentry *mystruct, int
rc = xds_encode (mqpck->xdsout, PCK_SENDTOQUEUE_FMT, destination, bufferlen, topic, buffer, bufferlen);
if (rc != XDS_OK) {
if (mqplib->logger)
mqplib->logger ("xds encode message failed %d.", rc);
MQLOG(mqplib, MQLOG_WARNING, "xds encode message failed %d.", rc);
return NS_FAILURE;
}
(mqlib_free)(buffer);
@ -289,8 +274,7 @@ pck_send_joinqueue(mqp *mqplib, mqpacket *mqpck, char *queue, int flags, char *f
rc = xds_encode (mqpck->xdsout, PCK_JOINQUEUE_FMT, queue, flags, filter);
if (rc != XDS_OK) {
if (mqplib->logger)
mqplib->logger ("xds encode joinqueue failed %d.", rc);
MQLOG(mqplib, MQLOG_WARNING, "xds encode joinqueue failed %d.", rc);
return NS_FAILURE;
}
@ -307,8 +291,7 @@ pck_send_queueinfo(mqp *mqplib, mqpacket *mqpck, char *queue, char *info, int fl
rc = xds_encode (mqpck->xdsout, PCK_QUEUEINFO_FMT, queue, flags, info);
if (rc != XDS_OK) {
if (mqplib->logger)
mqplib->logger ("xds encode queueinfo failed %d.", rc);
MQLOG(mqplib, MQLOG_WARNING, "xds encode queueinfo failed %d.", rc);
return NS_FAILURE;
}
@ -325,8 +308,7 @@ pck_send_queue_mes(mqp *mqplib, mqpacket *mqpck, char *queue, char *topic, void
rc = xds_encode (mqpck->xdsout, PCK_MSGFROMQUEUE_FMT, queue, topic, data, len, messid, timestamp, from);
if (rc != XDS_OK) {
if (mqplib->logger)
mqplib->logger ("xds encode msgfromqueue failed %d.", rc);
MQLOG(mqplib, MQLOG_WARNING,"xds encode msgfromqueue failed %d.", rc);
return NS_FAILURE;
}

View file

@ -27,7 +27,7 @@
#include <sys/socket.h>
#include <arpa/inet.h>
#include "defines.h"
#include "libmq.h"
#include "list.h"
#include "packet.h"
#include "xds.h"
@ -84,8 +84,39 @@ myeng standeng[] = {
void *(*mqlib_malloc)(size_t) = malloc;
void (*mqlib_free)(void *) = free;
#ifdef DEBUG
void dotrace(mqp *mqplib, DEBUG_LEVEL level, char *fname, int lno, char *fnct, char *fmt, ...) {
va_list ap;
char logbuf[BUFSIZE];
if (level >= mqplib->debug) {
va_start(ap, fmt);
vsnprintf(logbuf, BUFSIZE, fmt, ap);
va_end(ap);
if (!mqplib->logger) {
fprintf(stderr, "TRACE(%d) %s:%d(%s): %s\n", level, fname, lno, fnct, logbuf);
} else {
mqplib->logger(level, "TRACE(%d) %s:%d(%s): %s\n", level, fname, lno, fnct, logbuf);
}
}
}
#endif
void MQLOG(mqp *mqplib, LOG_LEVEL level, char *fmt, ...) {
va_list ap;
char logbuf[BUFSIZE];
if (level >= mqplib->loglvl) {
va_start(ap, fmt);
vsnprintf(logbuf, BUFSIZE, fmt, ap);
va_end(ap);
if (!mqplib->logger) {
fprintf(stdout, "%d: %s\n", level, logbuf);
} else {
mqplib->logger(level, logbuf);
}
}
}
mqp *init_mqlib () {
mqp *mqplib;
@ -94,6 +125,8 @@ mqp *init_mqlib () {
mqplib->logger = NULL;
mqplib->callback = NULL;
mqplib->myengines = standeng;
mqplib->loglvl = 5;
mqplib->debug = 5;
return mqplib;
}
@ -124,14 +157,19 @@ void pck_set_data(mqpacket *mqp, void *data) {
mqp->cbarg = data;
}
void pck_set_loglvl(mqp *mqplib, LOG_LEVEL level) {
mqplib->loglvl = level;
}
void pck_set_dbglvl(mqp *mqplib, DEBUG_LEVEL level) {
mqplib->debug = level;
}
xds_t *
pck_init_engines (mqp *mqplib, int type, int direction) {
int i, rc = 0;
xds_t *xds;
if (xds_init (&xds, direction) != XDS_OK) {
if (mqplib->logger)
mqplib->logger ("xds init failed: %s", strerror (errno));
MQLOG(mqplib, MQLOG_CRITICAL, "xds init failed: %s", strerror (errno));
return NULL;
}
i = 0;
@ -141,8 +179,7 @@ pck_init_engines (mqp *mqplib, int type, int direction) {
rc = xds_register (xds, mqplib->myengines[i].myname, mqplib->myengines[i].ptr, NULL);
}
if (rc != XDS_OK) {
if (mqplib->logger)
mqplib->logger ("xds_register failed for %s", mqplib->myengines[i].myname);
MQLOG(mqplib, MQLOG_WARNING, "xds_register failed for %s", mqplib->myengines[i].myname);
xds_destroy (xds);
return NULL;
}
@ -211,8 +248,8 @@ pck_new_connection (mqp *mqplib, int fd, int type, int contype)
case PCK_IS_SERVER:
break;
default:
if (mqplib->logger)
mqplib->logger ("pck_new_connection invalid type %d", contype);
MQLOG(mqplib, MQLOG_WARNING,"pck_new_connection invalid type %d", contype);
break;
}
return mqpck;
@ -248,8 +285,7 @@ read_fd (mqp *mqplib, mqpacket *mqp)
i = read (mqp->sock, buf, BUFSIZE);
if ((i < 0) && (i != EAGAIN)) {
/* error */
if (mqplib->logger)
mqplib->logger("Failed to Read fd %d(%d): %s", mqp->sock,i, strerror(errno));
MQLOG(mqplib, MQLOG_WARNING, "Failed to Read fd %d(%d): %s", mqp->sock,i, strerror(errno));
close_fd (mqplib, mqp);
/* XXX close and clean up */
return NS_FAILURE;
@ -273,18 +309,14 @@ read_fd (mqp *mqplib, mqpacket *mqp)
return NS_SUCCESS;
}
}
if (mqplib->logger) {
mqplib->logger ("processed %d bytes %d left", i, mqp->inbuf->off);
}
TRACE(mqplib, MQDBG4, "processed %d bytes %d left", i, mqp->inbuf->off);
return NS_SUCCESS;
}
int
close_fd (mqp *mqplib, mqpacket * mqp)
{
if (mqplib->logger) {
mqplib->logger ("Closing %d fd", mqp->sock);
}
TRACE(mqplib, MQDBG2, "Closing %d fd", mqp->sock);
close (mqp->sock);
pck_del_connection (mqplib, mqp);
@ -310,8 +342,7 @@ write_fd (mqp *mqplib, mqpacket * mqp)
return NS_SUCCESS;
}
/* something went wrong sending the data */
if (mqplib->logger)
mqplib->logger ("Error Sending on fd %d(%d): %s", mqp->sock, i, strerror(errno));
MQLOG(mqplib, MQLOG_WARNING, "Error Sending on fd %d(%d): %s", mqp->sock, i, strerror(errno));
close_fd (mqplib, mqp);
return NS_FAILURE;
} else if (i == 0) {
@ -341,8 +372,7 @@ write_fd (mqp *mqplib, mqpacket * mqp)
#endif
} else {
/* somethign went wrong encoding the data */
if (mqplib->logger)
mqplib->logger ("No Data to Send?");
TRACE(mqplib, MQDBG2, "No Data to Send?");
#if 0
close_fd (mqplib, mqp);
return NS_FAILURE;

View file

@ -21,18 +21,18 @@
** $Id$
*/
#include "libmq.h"
#include "list.h"
#include "packet.h"
#include "simpleif.h"
#include <fcntl.h>
#include <sys/poll.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include "defines.h"
#include "list.h"
#include "packet.h"
#include "sock.h"
list_t *connections;
typedef struct sc {
@ -56,41 +56,41 @@ int pck_simple_callback(void *mqplib, mqpacket *mqp) {
switch (mqp->inmsg.MSGTYPE) {
case PCK_ACK:
if (MQS_S_FLAG_IS_GOTSRVCAP(mqp)) {
pck_logger("Got CLNTCAP ACK");
TRACE(mqplib, MQDBG2, "Got CLNTCAP ACK");
MQS_S_FLAG_SET_SENTAUTH(mqp);
pck_send_auth(mqplib, mqp, mqp->si.username, mqp->si.password);
} else if (MQS_S_FLAG_IS_SENTAUTH(mqp)) {
pck_logger("login Ack'd");
TRACE(mqplib, MQDBG2, "login Ack'd");
type = PCK_SMP_LOGINOK;
MQS_S_FLAG_SET_CONNECTOK(mqp);
}
break;
case PCK_SRVCAP:
pck_logger("Got ServerCap");
TRACE(mqplib, MQDBG2, "Got ServerCap");
MQS_S_FLAG_SET_GOTSRVCAP(mqp);
pck_send_clntcap(mqplib, mqp);
break;
case PCK_ERROR:
if (MQS_S_FLAG_IS_GOTSRVCAP(mqp)) {
pck_logger("Server rejected out clientcap for %s", mqp->inmsg.data.string);
type = PCK_SMP_CLNTCAPREJ;
MQLOG(mqplib, MQLOG_CRITICAL, "Server rejected out clientcap for %s", mqp->inmsg.data.string);
/* type = PCK_SMP_CLNTCAPREJ; */
return NS_FAILURE;
} else if (MQS_S_FLAG_IS_SENTAUTH(mqp)) {
pck_logger("Server rejected out Login: %s", mqp->inmsg.data.string);
type = PCK_SMP_AUTHREJ;
MQLOG(mqplib, MQLOG_CRITICAL, "Server rejected out Login: %s", mqp->inmsg.data.string);
/* type = PCK_SMP_AUTHREJ; */
return NS_FAILURE;
}
break;
case PCK_QUEUEINFO:
pck_logger("Got QueueInfo for %s: %d (%s)", mqp->inmsg.data.joinqueue.queue, mqp->inmsg.data.joinqueue.flags, mqp->inmsg.data.joinqueue.filter);
TRACE(mqplib, MQDBG2, "Got QueueInfo for %s: %ld (%s)", mqp->inmsg.data.joinqueue.queue, mqp->inmsg.data.joinqueue.flags, mqp->inmsg.data.joinqueue.filter);
type = PCK_SMP_QUEUEINFO;
break;
case PCK_MSGFROMQUEUE:
pck_logger("Got Message from Queue %s sent by %s on %d with topic %s with messid %d", mqp->inmsg.data.sendmsg.queue, mqp->inmsg.data.sendmsg.from, mqp->inmsg.data.sendmsg.timestamp, mqp->inmsg.data.sendmsg.topic, mqp->inmsg.data.sendmsg.messid);
TRACE(mqplib, MQDBG2, "Got Message from Queue %s sent by %s on %ld with topic %s with messid %ld", mqp->inmsg.data.sendmsg.queue, mqp->inmsg.data.sendmsg.from, mqp->inmsg.data.sendmsg.timestamp, mqp->inmsg.data.sendmsg.topic, mqp->inmsg.data.sendmsg.messid);
type = PCK_SMP_MSGFROMQUEUE;
break;
default:
pck_logger("Uknown msgtype recieved: %xd", mqp->inmsg.MSGTYPE);
MQLOG(mqplib, MQLOG_WARNING, "Uknown msgtype recieved: %xd", mqp->inmsg.MSGTYPE);
}
if (type != 0) {
@ -100,31 +100,19 @@ int pck_simple_callback(void *mqplib, mqpacket *mqp) {
return NS_SUCCESS;
}
void pck_logger(char *fmt,...) {
va_list ap;
char log_buf[BUFSIZE];
if (sockconfig.debug == 1) {
va_start(ap, fmt);
vsnprintf(log_buf, BUFSIZE, fmt, ap);
va_end(ap);
// printf("MQ: %s\n", log_buf);
}
}
int init_socket(actioncbfunc *actioncb) {
connections = list_create(-1);
sockconfig.listenfd = -1;
sockconfig.listport = -1;
sockconfig.mqplib = init_mqlib();
sockconfig.debug = 0;
sockconfig.actioncb = actioncb;
pck_set_logger(sockconfig.mqplib, pck_logger);
pck_set_callback(sockconfig.mqplib, pck_simple_callback);
return NS_SUCCESS;
}
int debug_socket(int i) {
sockconfig.debug = i;
pck_set_dbglvl(sockconfig.mqplib, i);
return i;
}
@ -170,7 +158,6 @@ pck_before_poll (struct pollfd ufds[MAXCONNECTIONS])
}
node = list_next (connections, node);
if (count == (MAXCONNECTIONS -1)) {
pck_logger("count %d", count);
break;
}
}
@ -281,7 +268,6 @@ pck_process ()
j = poll (ufds, i, 100);
if (j < 0) {
pck_logger("poll returned %d: %s", j, strerror(j));
return NS_FAILURE;
}
return pck_after_poll (ufds, j);
@ -380,7 +366,7 @@ pck_make_connection (char *hostname, char *username, char *password, long flags,
if (inet_aton(hostname, &ipad) == 0) {
if ((hp = gethostbyname(hostname)) == NULL) {
pck_logger("gethostbyname failed");
/* XXX Err vals */
return NS_FAILURE;
}
sa.sin_family = AF_INET;
@ -393,7 +379,7 @@ pck_make_connection (char *hostname, char *username, char *password, long flags,
if ((s = socket (AF_INET, SOCK_STREAM, 0)) < 0) {
printf ("Can't create socket\n");
/* XXX Err Vals */
return NS_FAILURE;
}
/* XXX bind */
@ -404,7 +390,7 @@ pck_make_connection (char *hostname, char *username, char *password, long flags,
flags = connect (s, (struct sockaddr *) &sa, sizeof (sa));
if (flags < 0) {
if (errno != EINPROGRESS) {
printf ("Connect Failed %s\n", strerror(errno));
/* XXX Err Vals */
return NS_FAILURE;
}
}
@ -423,7 +409,6 @@ pck_make_connection (char *hostname, char *username, char *password, long flags,
mqp->pollopts |= POLLIN;
pck_logger("OutGoing Connection on fd %d", s);
return s;
}
@ -496,12 +481,12 @@ pck_decode_message(mq_data_senddata *sd, structentry *mystruct, int cols, void *
xdstmp = pck_init_engines(sockconfig.mqplib, ENG_TYPE_XML, XDS_DECODE);
if (xdstmp == NULL) {
pck_logger("pck_decode_message xds init failed");
/* pck_logger("pck_decode_message xds init failed"); */
return NS_FAILURE;
}
if (xds_setbuffer (xdstmp, XDS_LOAN, sd->data, sd->len) != XDS_OK) {
pck_logger("pck_decode_message XDS setbuffer Failed");
/* pck_logger("pck_decode_message XDS setbuffer Failed"); */
xds_destroy(xdstmp);
return NS_FAILURE;
}
@ -514,7 +499,7 @@ pck_decode_message(mq_data_senddata *sd, structentry *mystruct, int cols, void *
destptr = (void *) target + mystruct[i].offset;
*(char **)destptr = strndup(string, strlen(string));
} else {
pck_logger("pck_decode_message, STR_PSTR failed\n");
/* pck_logger("pck_decode_message, STR_PSTR failed\n"); */
}
free(string);
break;
@ -524,7 +509,7 @@ pck_decode_message(mq_data_senddata *sd, structentry *mystruct, int cols, void *
destptr = (void *) target + mystruct[i].offset;
*((int *)destptr) = myint;
} else {
pck_logger("pck_decode_message, STR_INT failed\n");
/* pck_logger("pck_decode_message, STR_INT failed\n"); */
}
break;
case STR_STR:
@ -533,12 +518,13 @@ pck_decode_message(mq_data_senddata *sd, structentry *mystruct, int cols, void *
destptr = (void *) target + mystruct[i].offset;
strncpy((char *)destptr, string, mystruct[i].size);
} else {
pck_logger("pck_decode_message, STR_STR failed\n");
/* pck_logger("pck_decode_message, STR_STR failed\n"); */
}
free(string);
break;
default:
pck_logger( "pck_decode_message, unknown type");
/* pck_logger( "pck_decode_message, unknown type"); */
break;
}
}
xds_destroy(xdstmp);

View file

@ -22,7 +22,7 @@
*/
#include <stdio.h>
#include <string.h>
#include "defines.h"
#include "libmq.h"
#include "packet.h"
#include "xds.h"

View file

@ -28,10 +28,10 @@
#ifdef HAVE_BACKTRACE
#include <execinfo.h>
#endif
#include "libmq.h"
#include "defines.h"
#include "signal.h"
#include "log.h"
#include "sock.h"
#include "dns.h"
#include "packet.h"
#include "serversock.h"

View file

@ -207,7 +207,8 @@ MQS_sock_start ()
mqssetup.connections = list_create(-1);
mqssetup.mqplib = init_mqlib();
pck_set_logger(mqssetup.mqplib, MQS_Logger);
/* pck_set_logger(mqssetup.mqplib, MQS_Logger); */
pck_set_dbglvl(mqssetup.mqplib, 0);
pck_set_callback(mqssetup.mqplib, MQS_Callback);
event_init();