#include #include #include #include #include #include #include #define BUFFER_SIZE 256 * 1024#define PACKET_SIZE 8192#define HEADER_SIZE 16typedef struct { char **buffer; int *length; int size; int read_idx; int write_idx; int count; pthread_mutex_t lock; pthread_cond_t not_full; pthread_cond_t not_empty;} CircularBuffer;typedef struct { CircularBuffer *buffer; const char *filename;} ReaderParams;typedef struct { CircularBuffer *buffer; const char *dest_ip; int dest_port;} SenderParams;void circularBufferInit(CircularBuffer *buffer, int size) { buffer->buffer = (char**)malloc(sizeof(char*) * size); buffer->length = (int*)malloc(sizeof(int) * size); for (int i = 0; i < size; i++) { buffer->buffer[i] = (char*)malloc(PACKET_SIZE); buffer->length[i] = 0; } buffer->size = size; buffer->read_idx = 0; buffer->write_idx = 0; buffer->count = 0; pthread_mutex_init(&buffer->lock, NULL); pthread_cond_init(&buffer->not_full, NULL); pthread_cond_init(&buffer->not_empty, NULL);}void circularBufferWrite(CircularBuffer *buffer, const void *data, size_t size) { pthread_mutex_lock(&buffer->lock); while (buffer->count == buffer->size) { pthread_cond_wait(&buffer->not_full, &buffer->lock); } char *packet = buffer->buffer[buffer->write_idx]; int padding_size = PACKET_SIZE - HEADER_SIZE - size; // 添加包头 memcpy(packet, "Custom Header", HEADER_SIZE); // 添加数据 memcpy(packet + HEADER_SIZE, data, size); // 补充0 memset(packet + HEADER_SIZE + size, 0, padding_size); // 记录数据包长度 buffer->length[buffer->write_idx] = size; buffer->write_idx = (buffer->write_idx + 1) % buffer->size; buffer->count++; pthread_cond_signal(&buffer->not_empty); pthread_mutex_unlock(&buffer->lock);}void circularBufferRead(CircularBuffer *buffer, void *data, size_t size) { pthread_mutex_lock(&buffer->lock); while (buffer->count == 0) { pthread_cond_wait(&buffer->not_empty, &buffer->lock); } char *packet = buffer->buffer[buffer->read_idx]; int packet_size = buffer->length[buffer->read_idx]; int padding_size = PACKET_SIZE - HEADER_SIZE - packet_size; // 读取数据 memcpy(data, packet + HEADER_SIZE, packet_size); // 填补的0不需要被读取,直接跳过 buffer->read_idx = (buffer->read_idx + 1) % buffer->size; buffer->count--; pthread_cond_signal(&buffer->not_full); pthread_mutex_unlock(&buffer->lock);}void circularBufferDestroy(CircularBuffer *buffer) { pthread_mutex_destroy(&buffer->lock); pthread_cond_destroy(&buffer->not_full); pthread_cond_destroy(&buffer->not_empty); for (int i = 0; i < buffer->size; i++) { free(buffer->buffer[i]); } free(buffer->buffer); free(buffer->length); buffer->size = 0; buffer->read_idx = 0; buffer->write_idx = 0; buffer->count = 0;}void *readerThread(void *params) { ReaderParams *readerParams = (ReaderParams*)params; FILE *file = fopen(readerParams->filename, "rb"); if (file == NULL) { perror("Failed to open file"); return NULL; } char data[PACKET_SIZE]; size_t bytesRead = 0; while ((bytesRead = fread(data, 1, PACKET_SIZE - HEADER_SIZE, file)) > 0) { circularBufferWrite(readerParams->buffer, data, bytesRead); usleep(1000); } fclose(file); return NULL;}int createUdpSocket(const char *dest_ip, int dest_port) { int sock = socket(AF_INET, SOCK_DGRAM, 0); struct sockaddr_in server_addr; memset(&server_addr, 0, sizeof(server_addr)); server_addr.sin_family = AF_INET; server_addr.sin_addr.s_addr = inet_addr(dest_ip); server_addr.sin_port = htons(dest_port); if (connect(sock, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) { perror("Failed to connect to UDP server"); close(sock); return -1; } return sock;}void *senderThread(void *params) { SenderParams *senderParams = (SenderParams*)params; int sock = createUdpSocket(senderParams->dest_ip, senderParams->dest_port); char data[PACKET_SIZE]; while (1) { circularBufferRead(senderParams->buffer, data, PACKET_SIZE - HEADER_SIZE); send(sock, data, PACKET_SIZE - HEADER_SIZE, 0); usleep(1000); } close(sock); return NULL;}int main() { CircularBuffer buffer; circularBufferInit(&buffer, BUFFER_SIZE); ReaderParams readerParams; readerParams.buffer = &buffer; readerParams.filename = "input.txt"; SenderParams senderParams; senderParams.buffer = &buffer; senderParams.dest_ip = "127.0.0.1"; // 目标地址 senderParams.dest_port = 12345; // 目标端口 pthread_t readerThreadId; pthread_t senderThreadId; pthread_create(&readerThreadId, NULL, readerThread, &readerParams); pthread_create(&senderThreadId, NULL, senderThread, &senderParams); pthread_join(readerThreadId, NULL); pthread_join(senderThreadId, NULL); circularBufferDestroy(&buffer); return 0;}
备注
#include #include #include #include #include #include #include #define BUFFER_SIZE 256 * 1024#define PACKET_SIZE 8192#define HEADER_SIZE 16typedef struct { char **buffer; int *length; int size; int read_idx; int write_idx; int count; pthread_mutex_t lock; pthread_cond_t not_full; pthread_cond_t not_empty;} CircularBuffer;typedef struct { CircularBuffer *buffer; const char *filename;} ReaderParams;typedef struct { CircularBuffer *buffer; const char *dest_ip; int dest_port;} SenderParams;void circularBufferInit(CircularBuffer *buffer, int size) { buffer->buffer = (char**)malloc(sizeof(char*) * size); buffer->length = (int*)malloc(sizeof(int) * size); for (int i = 0; i < size; i++) { buffer->buffer[i] = (char*)malloc(PACKET_SIZE); buffer->length[i] = 0; } buffer->size = size; buffer->read_idx = 0; buffer->write_idx = 0; buffer->count = 0; pthread_mutex_init(&buffer->lock, NULL); pthread_cond_init(&buffer->not_full, NULL); pthread_cond_init(&buffer->not_empty, NULL);}void circularBufferWrite(CircularBuffer *writbuffer, const void *data, size_t size) { pthread_mutex_lock(&writbuffer->lock); while (writbuffer->count == writbuffer->size) { pthread_cond_wait(&writbuffer->not_full, &writbuffer->lock); } char *packet = writbuffer->buffer[writbuffer->write_idx]; int padding_size = PACKET_SIZE - HEADER_SIZE - size; // 添加包头 memcpy(packet, "Custom Header", HEADER_SIZE); // 添加数据 memcpy(packet + HEADER_SIZE, data, size); // 补充0 memset(packet + HEADER_SIZE + size, 0, padding_size); // 记录数据包长度 writbuffer->length[writbuffer->write_idx] = size; writbuffer->write_idx = (writbuffer->write_idx + 1) % writbuffer->size; writbuffer->count++; pthread_cond_signal(&writbuffer->not_empty); pthread_mutex_unlock(&writbuffer->lock);}void circularBufferWrite(CircularBuffer *buffer, const void *data, size_t size) { pthread_mutex_lock(&buffer->lock); while (buffer->count == buffer->size) { pthread_cond_wait(&buffer->not_full, &buffer->lock); } char *packet = buffer->buffer[buffer->write_idx]; int padding_size = PACKET_SIZE - HEADER_SIZE - size; // 添加包头 memcpy(packet, "Custom Header", HEADER_SIZE); // 添加数据 memcpy(packet + HEADER_SIZE, data, size); // 补充0 memset(packet + HEADER_SIZE + size, 0, padding_size); // 记录数据包长度 buffer->length[buffer->write_idx] = size; buffer->write_idx = (buffer->write_idx + 1) % buffer->size; buffer->count++; pthread_cond_signal(&buffer->not_empty);//发送信号通知其他等待中的线程,表示缓冲区不再为空,这可能是用于唤醒一个等待从缓冲区中读取数据的消费者线程。 pthread_mutex_unlock(&buffer->lock);//解锁缓冲区的互斥锁,表示写入操作已完成,}void circularBufferRead(CircularBuffer *buffer, void *data, size_t size) { pthread_mutex_lock(&buffer->lock); while (buffer->count == 0) { pthread_cond_wait(&buffer->not_empty, &buffer->lock); } char *packet = buffer->buffer[buffer->read_idx]; int packet_size = buffer->length[buffer->read_idx]; int padding_size = PACKET_SIZE - HEADER_SIZE - packet_size; // 读取数据 memcpy(data, packet + HEADER_SIZE, packet_size); // 填补的0不需要被读取,直接跳过 buffer->read_idx = (buffer->read_idx + 1) % buffer->size; buffer->count--; pthread_cond_signal(&buffer->not_full); pthread_mutex_unlock(&buffer->lock);}void circularBufferDestroy(CircularBuffer *buffer) { pthread_mutex_destroy(&buffer->lock); pthread_cond_destroy(&buffer->not_full); pthread_cond_destroy(&buffer->not_empty); for (int i = 0; i < buffer->size; i++) { free(buffer->buffer[i]); } free(buffer->buffer); free(buffer->length); buffer->size = 0; buffer->read_idx = 0; buffer->write_idx = 0; buffer->count = 0;}void *readerThread(void *params) { ReaderParams *readerParams = (ReaderParams*)params; FILE *file = fopen(readerParams->filename, "rb"); if (file == NULL) { perror("Failed to open file"); return NULL; } char data[PACKET_SIZE]; size_t bytesRead = 0; while ((bytesRead = fread(data, 1, PACKET_SIZE - HEADER_SIZE, file)) > 0) { circularBufferWrite(readerParams->buffer, data, bytesRead); usleep(1000); } fclose(file); return NULL;}int createUdpSocket(const char *dest_ip, int dest_port) { int sock = socket(AF_INET, SOCK_DGRAM, 0); struct sockaddr_in server_addr; memset(&server_addr, 0, sizeof(server_addr)); server_addr.sin_family = AF_INET; server_addr.sin_addr.s_addr = inet_addr(dest_ip); server_addr.sin_port = htons(dest_port); if (connect(sock, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) { perror("Failed to connect to UDP server"); close(sock); return -1; } return sock;}void *senderThread(void *params) { SenderParams *senderParams = (SenderParams*)params; int sock = createUdpSocket(senderParams->dest_ip, senderParams->dest_port); char data[PACKET_SIZE]; while (1) { circularBufferRead(senderParams->buffer, data, PACKET_SIZE - HEADER_SIZE); send(sock, data, PACKET_SIZE - HEADER_SIZE, 0); usleep(1000); } close(sock); return NULL;}int main() { CircularBuffer buffer; circularBufferInit(&buffer, BUFFER_SIZE); ReaderParams readerParams; readerParams.buffer = &buffer; readerParams.filename = "input.txt"; SenderParams senderParams; senderParams.buffer = &buffer; senderParams.dest_ip = "127.0.0.1"; // 目标地址 senderParams.dest_port = 12345; // 目标端口 pthread_t readerThreadId; pthread_t senderThreadId; pthread_create(&readerThreadId, NULL, readerThread, &readerParams); pthread_create(&senderThreadId, NULL, senderThread, &senderParams); pthread_join(readerThreadId, NULL); pthread_join(senderThreadId, NULL); circularBufferDestroy(&buffer); return 0;}
来源地址:https://blog.csdn.net/weixin_38849487/article/details/133697564