1 #include "pt-1.2.1-yieldterm-2/pt.h" 2 #include <opennet.h> 3 #include <unistd.h> 4 #include <stdlib.h> 5 #include <string.h> 6 #include <stdio.h> 7 #include <time.h> 8 9 struct source_info { 10 const char *url; 11 double speed; 12 void *handle; 13 off_t offset; 14 int active; 15 int valid; 16 time_t start_time; 17 off_t bytes_read; 18 }; 19 20 struct thread_info { 21 struct source_info *source_list; 22 struct source_info *src; 23 FILE *fp; 24 off_t start_offset; 25 off_t end_offset; 26 off_t bytes_read; 27 int source_list_cnt; 28 }; 29 30 static inline void download_begin(struct source_info *s, off_t offset) { 31 int fseek_ret; 32 33 s->active = 0; 34 s->valid = 0; 35 s->bytes_read = 0; 36 s->start_time = time(NULL); 37 s->handle = fopen_net(s->url, "r"); 38 39 if (!s->handle) { 40 printf("Could not open \"%s\"\n", s->url); 41 return; 42 } 43 44 fseek_ret = fseeko_net(s->handle, offset, SEEK_SET); 45 if (fseek_ret < 0) { 46 fclose_net(s->handle); 47 s->handle = NULL; 48 return; 49 } 50 51 s->valid = 1; 52 s->active = 1; 53 54 return; 55 } 56 57 static inline void download_end(struct source_info *s) { 58 59 if (s->handle) { 60 fclose_net(s->handle); 61 s->handle = NULL; 62 } 63 64 s->active = 0; 65 66 return; 67 } 68 69 static inline void download_update_speed(struct source_info *s) { 70 time_t curr_time; 71 72 curr_time = time(NULL); 73 if ((curr_time - 10) >= s->start_time) { 74 s->speed = s->bytes_read / (curr_time - s->start_time); 75 } 76 77 return; 78 } 79 80 PT_THREAD(download_thread(struct pt *ptr, struct thread_info *tinfo)) { 81 static char buf[8192]; 82 struct source_info *newsrc; 83 size_t bytes_to_read, fread_ret, fwrite_ret; 84 int i; 85 86 PT_BEGIN(ptr); 87 88 if (!tinfo->src) { 89 PT_EXIT(ptr); 90 } 91 92 if (!tinfo->src->handle) { 93 download_begin(tinfo->src, tinfo->start_offset + tinfo->bytes_read); 94 } 95 96 while (1) { 97 PT_YIELD(ptr); 98 bytes_to_read = tinfo->end_offset - tinfo->start_offset - tinfo->bytes_read; 99 #ifdef DEBUG 100 printf("Running %p (%llu bytes left)\n", ptr, (unsigned long long) bytes_to_read); 101 #endif 102 103 if (bytes_to_read == 0) { 104 break; 105 } 106 107 newsrc = NULL; 108 for (i = 0; i < tinfo->source_list_cnt; i++) { 109 if (!tinfo->source_list[i].active && tinfo->source_list[i].valid) { 110 if (tinfo->source_list[i].speed > tinfo->src->speed || tinfo->src->handle == NULL) { 111 if (newsrc) { 112 if (tinfo->source_list[i].speed > newsrc->speed) { 113 newsrc = &tinfo->source_list[i]; 114 } 115 } else { 116 newsrc = &tinfo->source_list[i]; 117 } 118 } 119 } 120 } 121 122 if (newsrc && newsrc != tinfo->src) { 123 printf("Selected new source: %s\n", newsrc->url); 124 download_end(tinfo->src); 125 tinfo->src = newsrc; 126 download_begin(tinfo->src, tinfo->start_offset + tinfo->bytes_read); 127 } 128 129 if (tinfo->src->handle) { 130 if (bytes_to_read > sizeof(buf)) { 131 bytes_to_read = sizeof(buf); 132 } 133 134 fread_ret = fread_net(buf, sizeof(buf[0]), bytes_to_read, tinfo->src->handle); 135 136 if (feof_net(tinfo->src->handle)) { 137 download_end(tinfo->src); 138 } 139 140 if (fread_ret == 0) { 141 continue; 142 } 143 144 #ifdef DEBUG 145 printf("Writing %i bytes at %llu (%f bytes/sec)\n", (int) fread_ret, (unsigned long long) (tinfo->start_offset + tinfo->bytes_read), tinfo->src->speed); 146 #endif 147 148 #ifndef __WIN32__ 149 fseeko(tinfo->fp, tinfo->start_offset + tinfo->bytes_read, SEEK_SET); 150 #else 151 fseek(tinfo->fp, tinfo->start_offset + tinfo->bytes_read, SEEK_SET); 152 #endif 153 fwrite_ret = fwrite(buf, sizeof(buf[0]), fread_ret, tinfo->fp); 154 if (fwrite_ret != fread_ret) { 155 fprintf(stderr, "Error writing to file.\n"); 156 break; 157 } 158 159 tinfo->bytes_read += fread_ret; 160 tinfo->src->bytes_read += fread_ret; 161 download_update_speed(tinfo->src); 162 } 163 } 164 165 if (tinfo->src) { 166 download_end(tinfo->src); 167 tinfo->src = NULL; 168 } 169 170 PT_END(ptr); 171 } 172 173 int main(int argc, char **argv) { 174 NETFILE *fp; 175 struct source_info *sinfo; 176 struct thread_info *tinfo; 177 struct pt *pt_info; 178 double total_speed; 179 off_t filesize = -1, sectionsize; 180 char *output_filename = NULL; 181 FILE *ofp; 182 int num_sources, idx; 183 int all_threads_done; 184 int i, thread_iterations; 185 186 num_sources = argc - 1; 187 188 if (num_sources == 0) { 189 printf("Usage: mirrorget <url1> [<url2> [<url3> [...]]]\n"); 190 return(EXIT_FAILURE); 191 } 192 193 sinfo = malloc(sizeof(*sinfo) * num_sources); 194 if (!sinfo) { 195 perror("malloc"); 196 return(EXIT_FAILURE); 197 } 198 199 tinfo = malloc(sizeof(*tinfo) * num_sources); 200 if (!tinfo) { 201 perror("malloc"); 202 return(EXIT_FAILURE); 203 } 204 205 pt_info = malloc(sizeof(*pt_info) * num_sources); 206 if (!pt_info) { 207 perror("malloc"); 208 return(EXIT_FAILURE); 209 } 210 211 output_filename = strrchr(argv[1], '/'); 212 if (output_filename) { 213 output_filename++; 214 } else { 215 output_filename = "mirrorget.out"; 216 } 217 218 ofp = fopen(output_filename, "wb"); 219 if (!ofp) { 220 perror("fopen"); 221 return(EXIT_FAILURE); 222 } 223 224 if (filesize == -1) { 225 /* Determine file size */ 226 fp = fopen_net(argv[1], "r"); 227 filesize = flength_net(fp); 228 fclose_net(fp); 229 if (filesize == -1) { 230 fprintf(stderr, "Unable to determine file size of \"%s\". Aborting.\n", argv[1]); 231 return(EXIT_FAILURE); 232 } 233 } 234 sectionsize = filesize / num_sources; 235 printf("File size = %llu; Section size = %llu\n", (unsigned long long) filesize, (unsigned long long) sectionsize); 236 237 for (idx = 0; idx < num_sources; idx++) { 238 i = idx + 1; 239 sinfo[idx].url = argv[i]; 240 sinfo[idx].speed = 0.0; 241 sinfo[idx].handle = NULL; 242 sinfo[idx].offset = 0; 243 sinfo[idx].valid = 1; 244 sinfo[idx].active = 1; 245 246 tinfo[idx].src = &sinfo[idx]; 247 tinfo[idx].fp = ofp; 248 tinfo[idx].start_offset = idx * sectionsize; 249 tinfo[idx].end_offset = (idx + 1) * sectionsize; 250 tinfo[idx].bytes_read = 0; 251 tinfo[idx].source_list = sinfo; 252 tinfo[idx].source_list_cnt = num_sources; 253 254 PT_INIT(&pt_info[idx]); 255 } 256 tinfo[num_sources - 1].end_offset = filesize; 257 258 thread_iterations = 0; 259 while (1) { 260 thread_iterations++; 261 all_threads_done = 1; 262 for (idx = 0; idx < num_sources; idx++) { 263 if (PT_SCHEDULE(download_thread(&pt_info[idx], &tinfo[idx]))) { 264 all_threads_done = 0; 265 266 total_speed += tinfo[idx].src->speed; 267 } 268 } 269 270 if (all_threads_done) { 271 break; 272 } 273 274 if ((thread_iterations % 100) == 0) { 275 total_speed = 0.0; 276 for (idx = 0; idx < num_sources; idx++) { 277 if (sinfo[idx].active) { 278 total_speed += sinfo[idx].speed; 279 } 280 } 281 printf("Curr speed: %f MB/sec\n", (total_speed / (1024.0 * 1024.0))); 282 } 283 } 284 285 return(EXIT_SUCCESS); 286 } |