4835895 [rkeene@sledge /home/rkeene/tmp]$ cat -n mirrorget.c
  1 #include "pt-1.2.1-yieldterm-2/pt.h"
  2 #include <opennet.h>
  3 #include <unistd.h>
  4 #include <stdlib.h>
  5 #include <string.h>
  6 #include <stdio.h>
  7 #include <time.h>
  8 
  9 struct source_info {
 10     const char  *url;
 11     double      speed;
 12     void        *handle;
 13     off_t       offset;
 14     int         active;
 15     int         valid;
 16     time_t      start_time;
 17     off_t       bytes_read;
 18 };
 19 
 20 struct thread_info {
 21     struct source_info  *source_list;
 22     struct source_info  *src;
 23     FILE                *fp;
 24     off_t               start_offset;
 25     off_t               end_offset;
 26     off_t               bytes_read;
 27     int                 source_list_cnt;
 28 };
 29 
 30 static inline void download_begin(struct source_info *s, off_t offset) {
 31     int fseek_ret;
 32 
 33     s->active = 0;
 34     s->valid = 0;
 35     s->bytes_read = 0;
 36     s->start_time = time(NULL);
 37     s->handle = fopen_net(s->url, "r");
 38 
 39     if (!s->handle) {
 40         printf("Could not open \"%s\"\n", s->url);
 41         return;
 42     }
 43 
 44     fseek_ret = fseeko_net(s->handle, offset, SEEK_SET);
 45     if (fseek_ret < 0) {
 46         fclose_net(s->handle);
 47         s->handle = NULL;
 48         return;
 49     }
 50 
 51     s->valid = 1;
 52     s->active = 1;
 53 
 54     return;
 55 }
 56 
 57 static inline void download_end(struct source_info *s) {
 58 
 59     if (s->handle) {
 60         fclose_net(s->handle);
 61         s->handle = NULL;
 62     }
 63 
 64     s->active = 0;
 65 
 66     return;
 67 }
 68 
 69 static inline void download_update_speed(struct source_info *s) {
 70     time_t curr_time;
 71 
 72     curr_time = time(NULL);
 73     if ((curr_time - 10) >= s->start_time) {
 74         s->speed = s->bytes_read / (curr_time - s->start_time);
 75     }
 76 
 77     return;
 78 }
 79 
 80 PT_THREAD(download_thread(struct pt *ptr, struct thread_info *tinfo)) {
 81     static char buf[8192];
 82     struct source_info *newsrc;
 83     size_t bytes_to_read, fread_ret, fwrite_ret;
 84     int i;
 85 
 86     PT_BEGIN(ptr);
 87 
 88     if (!tinfo->src) {
 89         PT_EXIT(ptr);
 90     }
 91 
 92     if (!tinfo->src->handle) {
 93         download_begin(tinfo->src, tinfo->start_offset + tinfo->bytes_read);
 94     }
 95 
 96     while (1) {
 97         PT_YIELD(ptr);
 98         bytes_to_read = tinfo->end_offset - tinfo->start_offset - tinfo->bytes_read;
 99 #ifdef DEBUG
100         printf("Running %p (%llu bytes left)\n", ptr, (unsigned long long) bytes_to_read);
101 #endif
102 
103         if (bytes_to_read == 0) {
104             break;
105         }
106 
107         newsrc = NULL;
108         for (i = 0; i < tinfo->source_list_cnt; i++) {
109             if (!tinfo->source_list[i].active && tinfo->source_list[i].valid) {
110                 if (tinfo->source_list[i].speed > tinfo->src->speed || tinfo->src->handle == NULL) {
111                     if (newsrc) {
112                         if (tinfo->source_list[i].speed > newsrc->speed) {
113                             newsrc = &tinfo->source_list[i];
114                         }
115                     } else {
116                         newsrc = &tinfo->source_list[i];
117                     }
118                 }
119             }
120         }
121 
122         if (newsrc && newsrc != tinfo->src) {
123             printf("Selected new source: %s\n", newsrc->url);
124             download_end(tinfo->src);
125             tinfo->src = newsrc;
126             download_begin(tinfo->src, tinfo->start_offset + tinfo->bytes_read);
127         }
128 
129         if (tinfo->src->handle) {
130             if (bytes_to_read > sizeof(buf)) {
131                 bytes_to_read = sizeof(buf);
132             }
133 
134             fread_ret = fread_net(buf, sizeof(buf[0]), bytes_to_read, tinfo->src->handle);
135 
136             if (feof_net(tinfo->src->handle)) {
137                 download_end(tinfo->src);
138             }
139 
140             if (fread_ret == 0) {
141                 continue;
142             }
143 
144 #ifdef DEBUG
145             printf("Writing %i bytes at %llu (%f bytes/sec)\n", (int) fread_ret, (unsigned long long)
	(tinfo->start_offset + tinfo->bytes_read), tinfo->src->speed);
146 #endif
147 
148 #ifndef __WIN32__
149             fseeko(tinfo->fp, tinfo->start_offset + tinfo->bytes_read, SEEK_SET);
150 #else
151             fseek(tinfo->fp, tinfo->start_offset + tinfo->bytes_read, SEEK_SET);
152 #endif
153             fwrite_ret = fwrite(buf, sizeof(buf[0]), fread_ret, tinfo->fp);
154             if (fwrite_ret != fread_ret) {
155                 fprintf(stderr, "Error writing to file.\n");
156                 break;
157             }
158 
159             tinfo->bytes_read += fread_ret;
160             tinfo->src->bytes_read += fread_ret;
161             download_update_speed(tinfo->src);
162         }
163     }
164 
165     if (tinfo->src) {
166         download_end(tinfo->src);
167         tinfo->src = NULL;
168     }
169 
170     PT_END(ptr);
171 }
172 
173 int main(int argc, char **argv) {
174     NETFILE *fp;
175     struct source_info *sinfo;
176     struct thread_info *tinfo;
177     struct pt *pt_info;
178     double total_speed;
179     off_t filesize = -1, sectionsize;
180     char *output_filename = NULL;
181     FILE *ofp;
182     int num_sources, idx;
183     int all_threads_done;
184     int i, thread_iterations;
185 
186     num_sources = argc - 1;
187 
188     if (num_sources == 0) {
189         printf("Usage: mirrorget <url1> [<url2> [<url3> [...]]]\n");
190         return(EXIT_FAILURE);
191     }
192 
193     sinfo = malloc(sizeof(*sinfo) * num_sources);
194     if (!sinfo) {
195         perror("malloc");
196         return(EXIT_FAILURE);
197     }
198 
199     tinfo = malloc(sizeof(*tinfo) * num_sources);
200     if (!tinfo) {
201         perror("malloc");
202         return(EXIT_FAILURE);
203     }
204 
205     pt_info = malloc(sizeof(*pt_info) * num_sources);
206     if (!pt_info) {
207         perror("malloc");
208         return(EXIT_FAILURE);
209     }
210 
211     output_filename = strrchr(argv[1], '/');
212     if (output_filename) {
213         output_filename++;
214     } else {
215         output_filename = "mirrorget.out";
216     }
217 
218     ofp = fopen(output_filename, "wb");
219     if (!ofp) {
220         perror("fopen");
221         return(EXIT_FAILURE);
222     }
223 
224     if (filesize == -1) {
225         /* Determine file size */
226         fp = fopen_net(argv[1], "r");
227         filesize = flength_net(fp);
228         fclose_net(fp);
229         if (filesize == -1) {
230             fprintf(stderr, "Unable to determine file size of \"%s\".  Aborting.\n", argv[1]);
231             return(EXIT_FAILURE);
232         }
233     }
234     sectionsize = filesize / num_sources;
235     printf("File size = %llu; Section size = %llu\n", (unsigned long long) filesize, (unsigned long long) sectionsize);
236 
237     for (idx = 0; idx < num_sources; idx++) {
238         i = idx + 1;
239         sinfo[idx].url = argv[i];
240         sinfo[idx].speed = 0.0;
241         sinfo[idx].handle = NULL;
242         sinfo[idx].offset = 0;
243         sinfo[idx].valid = 1;
244         sinfo[idx].active = 1;
245 
246         tinfo[idx].src = &sinfo[idx];
247         tinfo[idx].fp = ofp;
248         tinfo[idx].start_offset = idx * sectionsize;
249         tinfo[idx].end_offset = (idx + 1) * sectionsize;
250         tinfo[idx].bytes_read = 0;
251         tinfo[idx].source_list = sinfo;
252         tinfo[idx].source_list_cnt = num_sources;
253 
254         PT_INIT(&pt_info[idx]);
255     }
256     tinfo[num_sources - 1].end_offset = filesize;
257 
258     thread_iterations = 0;
259     while (1) {
260         thread_iterations++;
261         all_threads_done = 1;
262         for (idx = 0; idx < num_sources; idx++) {
263             if (PT_SCHEDULE(download_thread(&pt_info[idx], &tinfo[idx]))) {
264                 all_threads_done = 0;
265 
266                 total_speed += tinfo[idx].src->speed;
267             }
268         }
269 
270         if (all_threads_done) {
271             break;
272         }
273 
274         if ((thread_iterations % 100) == 0) {
275             total_speed = 0.0;
276             for (idx = 0; idx < num_sources; idx++) {
277                 if (sinfo[idx].active) {
278                     total_speed += sinfo[idx].speed;
279                 }
280             }
281             printf("Curr speed: %f MB/sec\n", (total_speed / (1024.0 * 1024.0)));
282         }
283     }
284 
285     return(EXIT_SUCCESS);
286 }
4835896 [rkeene@sledge /home/rkeene/tmp]$

Click here to go back to the directory listing.
Click here to download this file.
last modified: 2008-02-04 11:55:41