-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlogical-clock.c
More file actions
294 lines (217 loc) · 8.9 KB
/
logical-clock.c
File metadata and controls
294 lines (217 loc) · 8.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
/*------------Lamport's Logical Clock---------------------------*/
/*------------Naive Implementation with 3 processes--------------*/
/*------------Source code: logical-clock.c----------------------------*/
/*
To run the program, compile with:
gcc -o lamportclock logical-clock.c
Now, first run process B in one window: ./lamportclock -p2
Then, run both process A and C in separate windows:
./lamportclock -p1
./lamportclock -p3
This is a naive implementation of lamport's clock, in which Process B receives
one message each from process A and C. Then, process B sends out one message
to process A and one message to process C. This is the reason, process B must
be run earlier than process A and C, as process B will be waiting for message
to be received from A and C.
*/
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#define BUFFER_SIZE 32
#define PROCESS_A_PORT 5000
#define PROCESS_B_PORT 6000
#define PROCESS_C_PORT 7000
/* defining a macro to get max value out of two values */
#define MAX(X, Y) (((X) > (Y)) ? (X) : (Y))
/*
We maintain a structure which contains our message.
The structure also contain the latest logical clock value of the
sending node.
*/
struct message {
int logical_clock;
char msg_content[BUFFER_SIZE];
};
/*
A wrapper function to handle all the errors while dealing
with various networking functions.
*/
int err_handler(int n, char * error) {
if(n < 0) {
perror(error);
exit(EXIT_FAILURE);
}
return n;
};
/*
A wrapper around sendto() function, so as to update the logical clock value
before sending a message
*/
int send_message(int sock, struct message * msg, size_t size, int flag,
struct sockaddr_in * recvr, socklen_t addrsize) {
/* increment the logical clock value for message send event*/
msg->logical_clock += 1;
/*send the UDP packet*/
return sendto(sock, msg, size, flag, recvr, addrsize);
};
/*
A wrapper around recvfrom() message to update logical clock
*/
int recv_message(int sock, struct message * rcvd_msg, size_t size, struct message * selfmsg) {
/*
increment the current logical clock value for message receive event
*/
selfmsg->logical_clock += 1;
/*receive the UDP packet*/
int recvd_bytes = recvfrom(sock, rcvd_msg, size, 0, NULL, NULL);
/*we are putting sender address and size of addr as NULL as we don't want to store
info about sender*/
/*
update the current logical clock according to received message
*/
selfmsg->logical_clock = MAX(selfmsg->logical_clock, rcvd_msg->logical_clock + 1);
return recvd_bytes;
}
void processA(int sock) {
/*
Sends out a message to process B.
Receives a message from process B.
*/
struct message mesg_a = {0, "A:Hello"}; /*initialising the message*/
/*Initial vlaue of logical_clock for each process is zero.*/
printf("\nInitial logical clock value of process A: %d\n", mesg_a.logical_clock);
struct message recvd_from_b;
struct sockaddr_in procB_address;
procB_address.sin_family = AF_INET;
procB_address.sin_port = htons(PROCESS_B_PORT);
procB_address.sin_addr.s_addr = INADDR_ANY;
socklen_t size_addr = sizeof(struct sockaddr_in);
printf("\nEvent 1 of Process A: snd msg to process B\n");
/*Sending the message to process B*/
err_handler(send_message(sock, &mesg_a, sizeof(mesg_a), 0,
(struct sockaddr_in *) &procB_address, size_addr), "SendError");
printf("Logical clock value of Processs A after event 1: %d\n", mesg_a.logical_clock);
printf("\nEvent 2 of Process A: rcv msg from process B\n");
/* Waiting for a message to be received */
printf("Waiting to receive message...\n");
err_handler(recv_message(sock, &recvd_from_b, sizeof(struct message), &mesg_a),
"ReceivingError");
printf("Logical clock value of Processs A after event 2: %d\n", mesg_a.logical_clock);
};
void processB(int sock) {
/*
Receives a message from process A.
Receives a message from process C.
Sends a message to process A.
Sends a message to process C.
*/
struct message mesg_b = {0, "B:Hello"}; /*initialising the message*/
/*Initial vlaue of logical_clock for each process is zero.*/
printf("\nInitial logical clock value of process B: %d\n", mesg_b.logical_clock);
struct message recvd_msg;
struct sockaddr_in recvr_address;
/*setting up address for any other process running in the same net*/
recvr_address.sin_family = AF_INET;
recvr_address.sin_addr.s_addr = INADDR_ANY;
/*Waiting for the message from process A and C*/
/* loop runs for 2 times for two time waiting for receive by process B*/
for(int i = 0; i < 2; i++) {
printf("\nEvent %d of Process B: recv message\n", i + 1);
printf("Waiting to receive message...\n");
err_handler(recv_message(sock, &recvd_msg, sizeof(struct message), &mesg_b),
"ReceivingError");
printf("Logical clock value of Process B after event %d: %d\n",
i + 1, mesg_b.logical_clock);
}
/* Now process B sends message to process A*/
/* setting port address for process A */
recvr_address.sin_port = htons(PROCESS_A_PORT);
printf("\nEvent 3 of Process B: snd msg to process A\n");
err_handler(send_message(sock, &mesg_b, sizeof(mesg_b), 0,
(struct sockaddr_in *) &recvr_address, sizeof(recvr_address)), "SendingError");
printf("Logical clock value of Processs B after event 3: %d\n", mesg_b.logical_clock);
/* Now process B sends message to process C*/
/* setting port address for process C */
recvr_address.sin_port = htons(PROCESS_C_PORT);
printf("\nEvent 4 of Process B: snd msg to process C\n");
err_handler(send_message(sock, &mesg_b, sizeof(mesg_b), 0,
(struct sockaddr_in *) &recvr_address, sizeof(recvr_address)), "SendingError");
printf("Logical clock value of Processs B after event 4: %d\n", mesg_b.logical_clock);
};
void processC(int sock) {
/*
Sends out a message to process B.
Receives a message from process B.
*/
struct message mesg_c = {0, "C:Hello"}; /*initialising the message*/
/*Initial vlaue of logical_clock for each process is zero.*/
printf("\nInitial logical clock value of process C: %d\n", mesg_c.logical_clock);
struct message recvd_from_b;
struct sockaddr_in procB_address;
procB_address.sin_family = AF_INET;
procB_address.sin_port = htons(PROCESS_B_PORT);
procB_address.sin_addr.s_addr = INADDR_ANY;
socklen_t size_addr = sizeof(struct sockaddr_in);
printf("\nEvent 1 of Process C: snd msg to process B\n");
/*Sending the message to process B*/
err_handler(send_message(sock, &mesg_c, sizeof(mesg_c), 0,
(struct sockaddr_in *) &procB_address, size_addr), "SendError");
printf("Logical clock value of Processs C after event 1: %d\n", mesg_c.logical_clock);
printf("\nEvent 2 of Process C: rcv msg from process B\n");
/* Waiting for a message to be received */
printf("Waiting to receive message...\n");
err_handler(recv_message(sock, &recvd_from_b, sizeof(struct message), &mesg_c),
"ReceivingError");
printf("Logical clock value of Processs C after event 2: %d\n", mesg_c.logical_clock);
}
int main(int argc, char * argv[]) {
short process_id;
/* to provide different ports to the 3 processes*/
int process_port[3] = {PROCESS_A_PORT, PROCESS_B_PORT, PROCESS_C_PORT};
/* the program requires an argument giving process id on comand-line.
Process id could have any value from set {1, 2, 3}. We are simulating the
program on 3 processes. If the argument is not given then instruct and exit. */
if(argc != 2) {
printf("\nUsage: ./lamportclock -p<1|2|3>\n\n");
exit(EXIT_FAILURE);
}
int sock_fd; /*file descriptor of socket*/
int port; /* contains the assigned port to the current process */
/* address of the current process.*/
struct sockaddr_in process_address;
socklen_t size_of_addr = sizeof(struct sockaddr_in);
/* zero out process_address */
memset(&process_address, 0, size_of_addr);
/*ensure process_id argument is in proper format*/
if(strlen(argv[1]) != 3) {
printf("\nUsage: ./lamportclock -p<1|2|3>\n\n");
exit(EXIT_FAILURE);
}
process_id = atoi(argv[1] + 2); /*get 3rd character from the argument as process_id*/
/* if process_id value is zero, then atoi() could not convert the given
argument to a valid integer. */
if(process_id == 0 || process_id > 3) {
printf("\nInvalid Argument.");
exit(EXIT_FAILURE);
}
port = process_port[process_id - 1]; /*selecting one port from the array for current process*/
/*Create a UDP socket*/
sock_fd = err_handler(socket(AF_INET, SOCK_DGRAM, 0), "SocketCreationError");
printf("\nSocket created.\n");
/*setting up the process address*/
process_address.sin_family = AF_INET;
process_address.sin_port = htons(port);
process_address.sin_addr.s_addr = INADDR_ANY;
/* We will bind the current process to above set-up address */
err_handler(bind(sock_fd, (struct sockaddr_in *) &process_address, size_of_addr), "BindError");
/*
According to process_id we will invoke respective functions.
Only one of these functions will run, for any instance of this program.
*/
(process_id == 1) ? processA(sock_fd) :
((process_id == 2) ? processB(sock_fd) : processC(sock_fd));
return 0;
}